Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
aaed039e
Commit
aaed039e
authored
12 years ago
by
Denny
Browse files
Options
Downloads
Patches
Plain Diff
Merged standalone and mesos EC2 scripts
parent
0ee44c22
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
ec2/spark-ec2-standalone
+1
-1
1 addition, 1 deletion
ec2/spark-ec2-standalone
ec2/spark_ec2.py
+56
-24
56 additions, 24 deletions
ec2/spark_ec2.py
ec2/spark_ec2_standalone.py
+0
-491
0 additions, 491 deletions
ec2/spark_ec2_standalone.py
with
57 additions
and
516 deletions
ec2/spark-ec2-standalone
+
1
−
1
View file @
aaed039e
...
...
@@ -17,4 +17,4 @@
# limitations under the License.
cd
"
`
dirname
$0
`
"
PYTHONPATH
=
"./third_party/boto-2.4.1.zip/boto-2.4.1:
$PYTHONPATH
"
python ./spark_ec2
_standalone.py
$@
PYTHONPATH
=
"./third_party/boto-2.4.1.zip/boto-2.4.1:
$PYTHONPATH
"
python ./spark_ec2
.py
--user
ec2-user
--cluster-type
standalone
-a
standalone
$@
This diff is collapsed.
Click to expand it.
ec2/spark_ec2.py
+
56
−
24
View file @
aaed039e
...
...
@@ -36,6 +36,7 @@ from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
# A static URL from which to figure out the latest Mesos EC2 AMI
LATEST_AMI_URL
=
"
https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.5
"
LATEST_STANDALONE_AMI_URL
=
"
https://s3.amazonaws.com/spark-standalone-amis/latest-spark
"
# Configure and parse our command-line arguments
...
...
@@ -62,7 +63,8 @@ def parse_args():
help
=
"
Availability zone to launch instances in
"
)
parser
.
add_option
(
"
-a
"
,
"
--ami
"
,
default
=
"
latest
"
,
help
=
"
Amazon Machine Image ID to use, or
'
latest
'
to use latest
"
+
"
availabe AMI (default: latest)
"
)
"
availabe mesos AMI,
'
standalone
'
for the latest available
"
+
"
standalone AMI (default: latest)
"
)
parser
.
add_option
(
"
-D
"
,
metavar
=
"
[ADDRESS:]PORT
"
,
dest
=
"
proxy_port
"
,
help
=
"
Use SSH dynamic port forwarding to create a SOCKS proxy at
"
+
"
the given local address (for use with login)
"
)
...
...
@@ -78,6 +80,11 @@ def parse_args():
parser
.
add_option
(
"
--spot-price
"
,
metavar
=
"
PRICE
"
,
type
=
"
float
"
,
help
=
"
If specified, launch slaves as spot instances with the given
"
+
"
maximum price (in dollars)
"
)
parser
.
add_option
(
"
-c
"
,
"
--cluster-type
"
,
default
=
"
mesos
"
,
help
=
"'
mesos
'
for a mesos cluster,
'
standalone
'
for a standalone spark cluster (default: mesos)
"
)
parser
.
add_option
(
"
-u
"
,
"
--user
"
,
default
=
"
root
"
,
help
=
"
The ssh user you want to connect as (default: root)
"
)
(
opts
,
args
)
=
parser
.
parse_args
()
if
len
(
args
)
!=
2
:
parser
.
print_help
()
...
...
@@ -87,6 +94,9 @@ def parse_args():
print
>>
stderr
,
(
"
ERROR: The -i or --identity-file argument is
"
+
"
required for
"
+
action
)
sys
.
exit
(
1
)
if
opts
.
cluster_type
not
in
[
"
mesos
"
,
"
standalone
"
]
and
action
==
"
launch
"
:
print
>>
stderr
,
(
"
ERROR: Invalid cluster type:
"
+
opts
.
cluster_type
)
sys
.
exit
(
1
)
if
os
.
getenv
(
'
AWS_ACCESS_KEY_ID
'
)
==
None
:
print
>>
stderr
,
(
"
ERROR: The environment variable AWS_ACCESS_KEY_ID
"
+
"
must be set
"
)
...
...
@@ -106,7 +116,7 @@ def get_or_make_group(conn, name):
return
group
[
0
]
else
:
print
"
Creating security group
"
+
name
return
conn
.
create_security_group
(
name
,
"
Mesos
EC2 group
"
)
return
conn
.
create_security_group
(
name
,
"
Spark
EC2 group
"
)
# Wait for a set of launched instances to exit the "pending" state
...
...
@@ -144,20 +154,22 @@ def launch_cluster(conn, opts, cluster_name):
master_group
.
authorize
(
src_group
=
zoo_group
)
master_group
.
authorize
(
'
tcp
'
,
22
,
22
,
'
0.0.0.0/0
'
)
master_group
.
authorize
(
'
tcp
'
,
8080
,
8081
,
'
0.0.0.0/0
'
)
master_group
.
authorize
(
'
tcp
'
,
50030
,
50030
,
'
0.0.0.0/0
'
)
master_group
.
authorize
(
'
tcp
'
,
50070
,
50070
,
'
0.0.0.0/0
'
)
master_group
.
authorize
(
'
tcp
'
,
60070
,
60070
,
'
0.0.0.0/0
'
)
master_group
.
authorize
(
'
tcp
'
,
38090
,
38090
,
'
0.0.0.0/0
'
)
if
opts
.
cluster_type
==
"
mesos
"
:
master_group
.
authorize
(
'
tcp
'
,
50030
,
50030
,
'
0.0.0.0/0
'
)
master_group
.
authorize
(
'
tcp
'
,
50070
,
50070
,
'
0.0.0.0/0
'
)
master_group
.
authorize
(
'
tcp
'
,
60070
,
60070
,
'
0.0.0.0/0
'
)
master_group
.
authorize
(
'
tcp
'
,
38090
,
38090
,
'
0.0.0.0/0
'
)
if
slave_group
.
rules
==
[]:
# Group was just now created
slave_group
.
authorize
(
src_group
=
master_group
)
slave_group
.
authorize
(
src_group
=
slave_group
)
slave_group
.
authorize
(
src_group
=
zoo_group
)
slave_group
.
authorize
(
'
tcp
'
,
22
,
22
,
'
0.0.0.0/0
'
)
slave_group
.
authorize
(
'
tcp
'
,
8080
,
8081
,
'
0.0.0.0/0
'
)
slave_group
.
authorize
(
'
tcp
'
,
50060
,
50060
,
'
0.0.0.0/0
'
)
slave_group
.
authorize
(
'
tcp
'
,
50075
,
50075
,
'
0.0.0.0/0
'
)
slave_group
.
authorize
(
'
tcp
'
,
60060
,
60060
,
'
0.0.0.0/0
'
)
slave_group
.
authorize
(
'
tcp
'
,
60075
,
60075
,
'
0.0.0.0/0
'
)
if
opts
.
cluster_type
==
"
mesos
"
:
slave_group
.
authorize
(
'
tcp
'
,
50060
,
50060
,
'
0.0.0.0/0
'
)
slave_group
.
authorize
(
'
tcp
'
,
50075
,
50075
,
'
0.0.0.0/0
'
)
slave_group
.
authorize
(
'
tcp
'
,
60060
,
60060
,
'
0.0.0.0/0
'
)
slave_group
.
authorize
(
'
tcp
'
,
60075
,
60075
,
'
0.0.0.0/0
'
)
if
zoo_group
.
rules
==
[]:
# Group was just now created
zoo_group
.
authorize
(
src_group
=
master_group
)
zoo_group
.
authorize
(
src_group
=
slave_group
)
...
...
@@ -179,13 +191,19 @@ def launch_cluster(conn, opts, cluster_name):
"
group %s, %s or %s
"
%
(
master_group
.
name
,
slave_group
.
name
,
zoo_group
.
name
))
sys
.
exit
(
1
)
if
opts
.
ami
==
"
latest
"
:
if
opts
.
ami
in
[
"
latest
"
,
"
standalone
"
]:
# Figure out the latest AMI from our static URL
if
opts
.
ami
==
"
latest
"
:
url
=
LATEST_AMI_URL
elif
opts
.
ami
==
"
standalone
"
:
url
=
LATEST_STANDALONE_AMI_URL
try
:
opts
.
ami
=
urllib2
.
urlopen
(
LATEST_AMI_URL
).
read
().
strip
()
opts
.
ami
=
urllib2
.
urlopen
(
url
).
read
().
strip
()
print
"
Latest Spark AMI:
"
+
opts
.
ami
except
:
print
>>
stderr
,
"
Could not read
"
+
LATEST_AMI_URL
print
>>
stderr
,
"
Could not read
"
+
url
print
"
Launching instances...
"
...
...
@@ -314,14 +332,25 @@ def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_k
master
=
master_nodes
[
0
].
public_dns_name
if
deploy_ssh_key
:
print
"
Copying SSH key %s to master...
"
%
opts
.
identity_file
ssh
(
master
,
opts
,
'
mkdir -p
/root
/.ssh
'
)
scp
(
master
,
opts
,
opts
.
identity_file
,
'
/root
/.ssh/id_rsa
'
)
ssh
(
master
,
opts
,
'
mkdir -p
~
/.ssh
'
)
scp
(
master
,
opts
,
opts
.
identity_file
,
'
~
/.ssh/id_rsa
'
)
print
"
Running setup on master...
"
if
opts
.
cluster_type
==
"
mesos
"
:
setup_mesos_cluster
(
master
,
opts
)
elif
opts
.
cluster_type
==
"
standalone
"
:
setup_standalone_cluster
(
master
,
slave_nodes
,
opts
)
print
"
Done!
"
def
setup_mesos_cluster
(
master
,
opts
):
ssh
(
master
,
opts
,
"
chmod u+x mesos-ec2/setup
"
)
ssh
(
master
,
opts
,
"
mesos-ec2/setup %s %s %s %s
"
%
(
"
generic
"
,
"
none
"
,
"
master
"
,
opts
.
swap
))
print
"
Done!
"
def
setup_standalone_cluster
(
master
,
slave_nodes
,
opts
):
slave_ips
=
'
\n
'
.
join
([
i
.
public_dns_name
for
i
in
slave_nodes
])
ssh
(
master
,
opts
,
"
echo
\"
%s
\"
> spark/conf/slaves
"
%
(
slave_ips
))
ssh
(
master
,
opts
,
"
/home/ec2-user/spark/bin/start-all.sh
"
)
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def
wait_for_cluster
(
conn
,
wait_secs
,
master_nodes
,
slave_nodes
,
zoo_nodes
):
...
...
@@ -380,9 +409,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
zoo_list
=
'
\n
'
.
join
([
i
.
public_dns_name
for
i
in
zoo_nodes
])
cluster_url
=
"
zoo://
"
+
"
,
"
.
join
(
[
"
%s:2181/mesos
"
%
i
.
public_dns_name
for
i
in
zoo_nodes
])
el
se
:
el
if
opts
.
cluster_type
==
"
mesos
"
:
zoo_list
=
"
NONE
"
cluster_url
=
"
%s:5050
"
%
active_master
elif
opts
.
cluster_type
==
"
standalone
"
:
zoo_list
=
"
NONE
"
cluster_url
=
"
%s:7077
"
%
active_master
template_vars
=
{
"
master_list
"
:
'
\n
'
.
join
([
i
.
public_dns_name
for
i
in
master_nodes
]),
...
...
@@ -416,7 +448,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
dest
.
close
()
# rsync the whole directory over to the master machine
command
=
((
"
rsync -rv -e
'
ssh -o StrictHostKeyChecking=no -i %s
'
"
+
"'
%s/
'
'
root
@%s:
/
'"
)
%
(
opts
.
identity_file
,
tmp_dir
,
active_master
))
"'
%s/
'
'
%s
@%s:
~
'"
)
%
(
opts
.
identity_file
,
tmp_dir
,
opts
.
user
,
active_master
))
subprocess
.
check_call
(
command
,
shell
=
True
)
# Remove the temp directory we created above
shutil
.
rmtree
(
tmp_dir
)
...
...
@@ -425,15 +457,15 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
# Copy a file to a given host through scp, throwing an exception if scp fails
def
scp
(
host
,
opts
,
local_file
,
dest_file
):
subprocess
.
check_call
(
"
scp -q -o StrictHostKeyChecking=no -i %s
'
%s
'
'
root
@%s:%s
'"
%
(
opts
.
identity_file
,
local_file
,
host
,
dest_file
),
shell
=
True
)
"
scp -q -o StrictHostKeyChecking=no -i %s
'
%s
'
'
%s
@%s:%s
'"
%
(
opts
.
identity_file
,
local_file
,
opts
.
user
,
host
,
dest_file
),
shell
=
True
)
# Run a command on a host through ssh, throwing an exception if ssh fails
def
ssh
(
host
,
opts
,
command
):
subprocess
.
check_call
(
"
ssh -t -o StrictHostKeyChecking=no -i %s
root
@%s
'
%s
'"
%
(
opts
.
identity_file
,
host
,
command
),
shell
=
True
)
"
ssh -t -o StrictHostKeyChecking=no -i %s
%s
@%s
'
%s
'"
%
(
opts
.
identity_file
,
opts
.
user
,
host
,
command
),
shell
=
True
)
def
main
():
...
...
@@ -480,8 +512,8 @@ def main():
proxy_opt
=
""
if
opts
.
proxy_port
!=
None
:
proxy_opt
=
"
-D
"
+
opts
.
proxy_port
subprocess
.
check_call
(
"
ssh -o StrictHostKeyChecking=no -i %s %s
root
@%s
"
%
(
opts
.
identity_file
,
proxy_opt
,
master
),
shell
=
True
)
subprocess
.
check_call
(
"
ssh -o StrictHostKeyChecking=no -i %s %s
%s
@%s
"
%
(
opts
.
identity_file
,
proxy_opt
,
opts
.
user
,
master
),
shell
=
True
)
elif
action
==
"
get-master
"
:
(
master_nodes
,
slave_nodes
,
zoo_nodes
)
=
get_existing_cluster
(
conn
,
opts
,
cluster_name
)
...
...
This diff is collapsed.
Click to expand it.
ec2/spark_ec2_standalone.py
deleted
100755 → 0
+
0
−
491
View file @
0ee44c22
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
with_statement
import
boto
import
logging
import
os
import
random
import
shutil
import
subprocess
import
sys
import
tempfile
import
time
import
urllib2
from
optparse
import
OptionParser
from
sys
import
stderr
from
boto.ec2.blockdevicemapping
import
BlockDeviceMapping
,
EBSBlockDeviceType
# A static URL from which to figure out the latest Mesos EC2 AMI
LATEST_AMI_URL
=
"
https://s3.amazonaws.com/spark-standalone-amis/latest-spark
"
# Configure and parse our command-line arguments
def
parse_args
():
parser
=
OptionParser
(
usage
=
"
spark-ec2-standalone [options] <action> <cluster_name>
"
+
"
\n\n
<action> can be: launch, destroy, login, stop, start, get-master
"
,
add_help_option
=
False
)
parser
.
add_option
(
"
-h
"
,
"
--help
"
,
action
=
"
help
"
,
help
=
"
Show this help message and exit
"
)
parser
.
add_option
(
"
-s
"
,
"
--slaves
"
,
type
=
"
int
"
,
default
=
1
,
help
=
"
Number of slaves to launch (default: 1)
"
)
parser
.
add_option
(
"
-w
"
,
"
--wait
"
,
type
=
"
int
"
,
default
=
120
,
help
=
"
Seconds to wait for nodes to start (default: 120)
"
)
parser
.
add_option
(
"
-k
"
,
"
--key-pair
"
,
help
=
"
Key pair to use on instances
"
)
parser
.
add_option
(
"
-i
"
,
"
--identity-file
"
,
help
=
"
SSH private key file to use for logging into instances
"
)
parser
.
add_option
(
"
-t
"
,
"
--instance-type
"
,
default
=
"
m1.large
"
,
help
=
"
Type of instance to launch (default: m1.large).
"
+
"
WARNING: must be 64-bit; small instances won
'
t work
"
)
parser
.
add_option
(
"
-m
"
,
"
--master-instance-type
"
,
default
=
""
,
help
=
"
Master instance type (leave empty for same as instance-type)
"
)
parser
.
add_option
(
"
-z
"
,
"
--zone
"
,
default
=
"
us-east-1b
"
,
help
=
"
Availability zone to launch instances in
"
)
parser
.
add_option
(
"
-a
"
,
"
--ami
"
,
default
=
"
latest
"
,
help
=
"
Amazon Machine Image ID to use, or
'
latest
'
to use latest
"
+
"
availabe AMI (default: latest)
"
)
parser
.
add_option
(
"
-D
"
,
metavar
=
"
[ADDRESS:]PORT
"
,
dest
=
"
proxy_port
"
,
help
=
"
Use SSH dynamic port forwarding to create a SOCKS proxy at
"
+
"
the given local address (for use with login)
"
)
parser
.
add_option
(
"
--resume
"
,
action
=
"
store_true
"
,
default
=
False
,
help
=
"
Resume installation on a previously launched cluster
"
+
"
(for debugging)
"
)
parser
.
add_option
(
"
--ebs-vol-size
"
,
metavar
=
"
SIZE
"
,
type
=
"
int
"
,
default
=
0
,
help
=
"
Attach a new EBS volume of size SIZE (in GB) to each node as
"
+
"
/vol. The volumes will be deleted when the instances terminate.
"
+
"
Only possible on EBS-backed AMIs.
"
)
parser
.
add_option
(
"
--swap
"
,
metavar
=
"
SWAP
"
,
type
=
"
int
"
,
default
=
1024
,
help
=
"
Swap space to set up per node, in MB (default: 1024)
"
)
parser
.
add_option
(
"
--spot-price
"
,
metavar
=
"
PRICE
"
,
type
=
"
float
"
,
help
=
"
If specified, launch slaves as spot instances with the given
"
+
"
maximum price (in dollars)
"
)
(
opts
,
args
)
=
parser
.
parse_args
()
if
len
(
args
)
!=
2
:
parser
.
print_help
()
sys
.
exit
(
1
)
(
action
,
cluster_name
)
=
args
if
opts
.
identity_file
==
None
and
action
in
[
'
launch
'
,
'
login
'
]:
print
>>
stderr
,
(
"
ERROR: The -i or --identity-file argument is
"
+
"
required for
"
+
action
)
sys
.
exit
(
1
)
if
os
.
getenv
(
'
AWS_ACCESS_KEY_ID
'
)
==
None
:
print
>>
stderr
,
(
"
ERROR: The environment variable AWS_ACCESS_KEY_ID
"
+
"
must be set
"
)
sys
.
exit
(
1
)
if
os
.
getenv
(
'
AWS_SECRET_ACCESS_KEY
'
)
==
None
:
print
>>
stderr
,
(
"
ERROR: The environment variable AWS_SECRET_ACCESS_KEY
"
+
"
must be set
"
)
sys
.
exit
(
1
)
return
(
opts
,
action
,
cluster_name
)
# Get the EC2 security group of the given name, creating it if it doesn't exist
def
get_or_make_group
(
conn
,
name
):
groups
=
conn
.
get_all_security_groups
()
group
=
[
g
for
g
in
groups
if
g
.
name
==
name
]
if
len
(
group
)
>
0
:
return
group
[
0
]
else
:
print
"
Creating security group
"
+
name
return
conn
.
create_security_group
(
name
,
"
Spark EC2 group
"
)
# Wait for a set of launched instances to exit the "pending" state
# (i.e. either to start running or to fail and be terminated)
def
wait_for_instances
(
conn
,
instances
):
while
True
:
for
i
in
instances
:
i
.
update
()
if
len
([
i
for
i
in
instances
if
i
.
state
==
'
pending
'
])
>
0
:
time
.
sleep
(
5
)
else
:
return
# Check whether a given EC2 instance object is in a state we consider active,
# i.e. not terminating or terminated. We count both stopping and stopped as
# active since we can restart stopped clusters.
def
is_active
(
instance
):
return
(
instance
.
state
in
[
'
pending
'
,
'
running
'
,
'
stopping
'
,
'
stopped
'
])
# Launch a cluster of the given name, by setting up its security groups,
# and then starting new instances in them.
# Returns a tuple of EC2 reservation objects for the master, slave
# instances (in that order).
# Fails if there already instances running in the cluster's groups.
def
launch_cluster
(
conn
,
opts
,
cluster_name
):
print
"
Setting up security groups...
"
master_group
=
get_or_make_group
(
conn
,
cluster_name
+
"
-master
"
)
slave_group
=
get_or_make_group
(
conn
,
cluster_name
+
"
-slaves
"
)
if
master_group
.
rules
==
[]:
# Group was just now created
master_group
.
authorize
(
src_group
=
master_group
)
master_group
.
authorize
(
src_group
=
slave_group
)
master_group
.
authorize
(
'
tcp
'
,
22
,
22
,
'
0.0.0.0/0
'
)
master_group
.
authorize
(
'
tcp
'
,
8080
,
8081
,
'
0.0.0.0/0
'
)
master_group
.
authorize
(
'
tcp
'
,
7077
,
7077
,
'
0.0.0.0/0
'
)
if
slave_group
.
rules
==
[]:
# Group was just now created
slave_group
.
authorize
(
src_group
=
master_group
)
slave_group
.
authorize
(
src_group
=
slave_group
)
slave_group
.
authorize
(
'
tcp
'
,
22
,
22
,
'
0.0.0.0/0
'
)
slave_group
.
authorize
(
'
tcp
'
,
8080
,
8081
,
'
0.0.0.0/0
'
)
# Check if instances are already running in our groups
print
"
Checking for running cluster...
"
reservations
=
conn
.
get_all_instances
()
for
res
in
reservations
:
group_names
=
[
g
.
id
for
g
in
res
.
groups
]
if
master_group
.
name
in
group_names
or
slave_group
.
name
in
group_names
:
active
=
[
i
for
i
in
res
.
instances
if
is_active
(
i
)]
if
len
(
active
)
>
0
:
print
>>
stderr
,
(
"
ERROR: There are already instances running in
"
+
"
group %s or %s
"
%
(
master_group
.
name
,
slave_group
.
name
))
sys
.
exit
(
1
)
if
opts
.
ami
==
"
latest
"
:
# Figure out the latest AMI from our static URL
try
:
opts
.
ami
=
urllib2
.
urlopen
(
LATEST_AMI_URL
).
read
().
strip
()
print
"
Latest Spark AMI:
"
+
opts
.
ami
except
:
print
>>
stderr
,
"
Could not read
"
+
LATEST_AMI_URL
print
"
Launching instances...
"
try
:
image
=
conn
.
get_all_images
(
image_ids
=
[
opts
.
ami
])[
0
]
except
:
print
>>
stderr
,
"
Could not find AMI
"
+
opts
.
ami
sys
.
exit
(
1
)
# Create block device mapping so that we can add an EBS volume if asked to
block_map
=
BlockDeviceMapping
()
if
opts
.
ebs_vol_size
>
0
:
device
=
EBSBlockDeviceType
()
device
.
size
=
opts
.
ebs_vol_size
device
.
delete_on_termination
=
True
block_map
[
"
/dev/sdv
"
]
=
device
# Launch slaves
if
opts
.
spot_price
!=
None
:
# Launch spot instances with the requested price
print
(
"
Requesting %d slaves as spot instances with price $%.3f
"
%
(
opts
.
slaves
,
opts
.
spot_price
))
slave_reqs
=
conn
.
request_spot_instances
(
price
=
opts
.
spot_price
,
image_id
=
opts
.
ami
,
launch_group
=
"
launch-group-%s
"
%
cluster_name
,
placement
=
opts
.
zone
,
count
=
opts
.
slaves
,
key_name
=
opts
.
key_pair
,
security_groups
=
[
slave_group
],
instance_type
=
opts
.
instance_type
,
block_device_map
=
block_map
)
my_req_ids
=
[
req
.
id
for
req
in
slave_reqs
]
print
"
Waiting for spot instances to be granted...
"
while
True
:
time
.
sleep
(
10
)
reqs
=
conn
.
get_all_spot_instance_requests
()
id_to_req
=
{}
for
r
in
reqs
:
id_to_req
[
r
.
id
]
=
r
active
=
0
instance_ids
=
[]
for
i
in
my_req_ids
:
if
id_to_req
[
i
].
state
==
"
active
"
:
active
+=
1
instance_ids
.
append
(
id_to_req
[
i
].
instance_id
)
if
active
==
opts
.
slaves
:
print
"
All %d slaves granted
"
%
opts
.
slaves
reservations
=
conn
.
get_all_instances
(
instance_ids
)
slave_nodes
=
[]
for
r
in
reservations
:
slave_nodes
+=
r
.
instances
break
else
:
print
"
%d of %d slaves granted, waiting longer
"
%
(
active
,
opts
.
slaves
)
else
:
# Launch non-spot instances
slave_res
=
image
.
run
(
key_name
=
opts
.
key_pair
,
security_groups
=
[
slave_group
],
instance_type
=
opts
.
instance_type
,
placement
=
opts
.
zone
,
min_count
=
opts
.
slaves
,
max_count
=
opts
.
slaves
,
block_device_map
=
block_map
)
slave_nodes
=
slave_res
.
instances
print
"
Launched slaves, regid =
"
+
slave_res
.
id
# Launch masters
master_type
=
opts
.
master_instance_type
if
master_type
==
""
:
master_type
=
opts
.
instance_type
master_res
=
image
.
run
(
key_name
=
opts
.
key_pair
,
security_groups
=
[
master_group
],
instance_type
=
master_type
,
placement
=
opts
.
zone
,
min_count
=
1
,
max_count
=
1
,
block_device_map
=
block_map
)
master_nodes
=
master_res
.
instances
print
"
Launched master, regid =
"
+
master_res
.
id
# Return all the instances
return
(
master_nodes
,
slave_nodes
)
# Get the EC2 instances in an existing cluster if available.
# Returns a tuple of lists of EC2 instance objects for the masters,
# slaves(in that order).
def
get_existing_cluster
(
conn
,
opts
,
cluster_name
):
print
"
Searching for existing cluster
"
+
cluster_name
+
"
...
"
reservations
=
conn
.
get_all_instances
()
master_nodes
=
[]
slave_nodes
=
[]
for
res
in
reservations
:
active
=
[
i
for
i
in
res
.
instances
if
is_active
(
i
)]
if
len
(
active
)
>
0
:
group_names
=
[
g
.
name
for
g
in
res
.
groups
]
if
group_names
==
[
cluster_name
+
"
-master
"
]:
master_nodes
+=
res
.
instances
elif
group_names
==
[
cluster_name
+
"
-slaves
"
]:
slave_nodes
+=
res
.
instances
if
master_nodes
!=
[]
and
slave_nodes
!=
[]:
print
(
"
Found %d master(s), %d slaves
"
%
(
len
(
master_nodes
),
len
(
slave_nodes
)))
return
(
master_nodes
,
slave_nodes
)
else
:
if
master_nodes
==
[]
and
slave_nodes
!=
[]:
print
"
ERROR: Could not find master in group
"
+
cluster_name
+
"
-master
"
elif
master_nodes
!=
[]
and
slave_nodes
==
[]:
print
"
ERROR: Could not find slaves in group
"
+
cluster_name
+
"
-slaves
"
else
:
print
"
ERROR: Could not find any existing cluster
"
sys
.
exit
(
1
)
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
def
setup_cluster
(
conn
,
master_nodes
,
slave_nodes
,
opts
,
deploy_ssh_key
):
print
"
Deploying files to master...
"
# deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes)
master
=
master_nodes
[
0
].
public_dns_name
if
deploy_ssh_key
:
print
"
Copying SSH key %s to master...
"
%
opts
.
identity_file
ssh
(
master
,
opts
,
'
mkdir -p /home/ec2-user/.ssh
'
)
scp
(
master
,
opts
,
opts
.
identity_file
,
'
/home/ec2-user/.ssh/id_rsa
'
)
slave_ips
=
'
\n
'
.
join
([
i
.
public_dns_name
for
i
in
slave_nodes
])
ssh
(
master
,
opts
,
"
echo
\"
%s
\"
> spark/conf/slaves
"
%
(
slave_ips
))
ssh
(
master
,
opts
,
"
/home/ec2-user/spark/bin/start-all.sh
"
)
print
"
Done!
"
# Wait for a whole cluster (masters, slaves) to start up
def
wait_for_cluster
(
conn
,
wait_secs
,
master_nodes
,
slave_nodes
):
print
"
Waiting for instances to start up...
"
time
.
sleep
(
5
)
wait_for_instances
(
conn
,
master_nodes
)
wait_for_instances
(
conn
,
slave_nodes
)
print
"
Waiting %d more seconds...
"
%
wait_secs
time
.
sleep
(
wait_secs
)
# Get number of local disks available for a given EC2 instance type.
def
get_num_disks
(
instance_type
):
# From http://docs.amazonwebservices.com/AWSEC2/latest/UserGuide/index.html?InstanceStorage.html
disks_by_instance
=
{
"
m1.small
"
:
1
,
"
m1.large
"
:
2
,
"
m1.xlarge
"
:
4
,
"
t1.micro
"
:
1
,
"
c1.medium
"
:
1
,
"
c1.xlarge
"
:
4
,
"
m2.xlarge
"
:
1
,
"
m2.2xlarge
"
:
1
,
"
m2.4xlarge
"
:
2
,
"
cc1.4xlarge
"
:
2
,
"
cc2.8xlarge
"
:
4
,
"
cg1.4xlarge
"
:
2
}
if
instance_type
in
disks_by_instance
:
return
disks_by_instance
[
instance_type
]
else
:
print
>>
stderr
,
(
"
WARNING: Don
'
t know number of disks on instance type %s; assuming 1
"
%
instance_type
)
return
1
# Deploy the configuration file templates in a given local directory to
# a cluster, filling in any template parameters with information about the
# cluster (e.g. lists of masters and slaves). Files are only deployed to
# the first master instance in the cluster, and we expect the setup
# script to be run on that instance to copy them to other nodes.
def
deploy_files
(
conn
,
root_dir
,
opts
,
master_nodes
,
slave_nodes
):
active_master
=
master_nodes
[
0
].
public_dns_name
num_disks
=
get_num_disks
(
opts
.
instance_type
)
hdfs_data_dirs
=
"
/mnt/ephemeral-hdfs/data
"
mapred_local_dirs
=
"
/mnt/hadoop/mrlocal
"
if
num_disks
>
1
:
for
i
in
range
(
2
,
num_disks
+
1
):
hdfs_data_dirs
+=
"
,/mnt%d/ephemeral-hdfs/data
"
%
i
mapred_local_dirs
+=
"
,/mnt%d/hadoop/mrlocal
"
%
i
cluster_url
=
"
%s:7077
"
%
active_master
template_vars
=
{
"
master_list
"
:
'
\n
'
.
join
([
i
.
public_dns_name
for
i
in
master_nodes
]),
"
active_master
"
:
active_master
,
"
slave_list
"
:
'
\n
'
.
join
([
i
.
public_dns_name
for
i
in
slave_nodes
]),
"
cluster_url
"
:
cluster_url
,
"
hdfs_data_dirs
"
:
hdfs_data_dirs
,
"
mapred_local_dirs
"
:
mapred_local_dirs
}
# Create a temp directory in which we will place all the files to be
# deployed after we substitue template parameters in them
tmp_dir
=
tempfile
.
mkdtemp
()
for
path
,
dirs
,
files
in
os
.
walk
(
root_dir
):
if
path
.
find
(
"
.svn
"
)
==
-
1
:
dest_dir
=
os
.
path
.
join
(
'
/
'
,
path
[
len
(
root_dir
):])
local_dir
=
tmp_dir
+
dest_dir
if
not
os
.
path
.
exists
(
local_dir
):
os
.
makedirs
(
local_dir
)
for
filename
in
files
:
if
filename
[
0
]
not
in
'
#.~
'
and
filename
[
-
1
]
!=
'
~
'
:
dest_file
=
os
.
path
.
join
(
dest_dir
,
filename
)
local_file
=
tmp_dir
+
dest_file
with
open
(
os
.
path
.
join
(
path
,
filename
))
as
src
:
with
open
(
local_file
,
"
w
"
)
as
dest
:
text
=
src
.
read
()
for
key
in
template_vars
:
text
=
text
.
replace
(
"
{{
"
+
key
+
"
}}
"
,
template_vars
[
key
])
dest
.
write
(
text
)
dest
.
close
()
# rsync the whole directory over to the master machine
command
=
((
"
rsync -rv -e
'
ssh -o StrictHostKeyChecking=no -i %s
'
"
+
"'
%s/
'
'
ec2-user@%s:/
'"
)
%
(
opts
.
identity_file
,
tmp_dir
,
active_master
))
subprocess
.
check_call
(
command
,
shell
=
True
)
# Remove the temp directory we created above
shutil
.
rmtree
(
tmp_dir
)
# Copy a file to a given host through scp, throwing an exception if scp fails
def
scp
(
host
,
opts
,
local_file
,
dest_file
):
subprocess
.
check_call
(
"
scp -q -o StrictHostKeyChecking=no -i %s
'
%s
'
'
ec2-user@%s:%s
'"
%
(
opts
.
identity_file
,
local_file
,
host
,
dest_file
),
shell
=
True
)
# Run a command on a host through ssh, throwing an exception if ssh fails
def
ssh
(
host
,
opts
,
command
):
subprocess
.
check_call
(
"
ssh -t -o StrictHostKeyChecking=no -i %s ec2-user@%s
'
%s
'"
%
(
opts
.
identity_file
,
host
,
command
),
shell
=
True
)
def
main
():
(
opts
,
action
,
cluster_name
)
=
parse_args
()
conn
=
boto
.
connect_ec2
()
# Select an AZ at random if it was not specified.
if
opts
.
zone
==
""
:
opts
.
zone
=
random
.
choice
(
conn
.
get_all_zones
()).
name
if
action
==
"
launch
"
:
if
opts
.
resume
:
(
master_nodes
,
slave_nodes
)
=
get_existing_cluster
(
conn
,
opts
,
cluster_name
)
else
:
(
master_nodes
,
slave_nodes
)
=
launch_cluster
(
conn
,
opts
,
cluster_name
)
wait_for_cluster
(
conn
,
opts
.
wait
,
master_nodes
,
slave_nodes
)
setup_cluster
(
conn
,
master_nodes
,
slave_nodes
,
opts
,
True
)
elif
action
==
"
destroy
"
:
response
=
raw_input
(
"
Are you sure you want to destroy the cluster
"
+
cluster_name
+
"
?
\n
ALL DATA ON ALL NODES WILL BE LOST!!
\n
"
+
"
Destroy cluster
"
+
cluster_name
+
"
(y/N):
"
)
if
response
==
"
y
"
:
(
master_nodes
,
slave_nodes
)
=
get_existing_cluster
(
conn
,
opts
,
cluster_name
)
print
"
Terminating master...
"
for
inst
in
master_nodes
:
inst
.
terminate
()
print
"
Terminating slaves...
"
for
inst
in
slave_nodes
:
inst
.
terminate
()
elif
action
==
"
login
"
:
(
master_nodes
,
slave_nodes
)
=
get_existing_cluster
(
conn
,
opts
,
cluster_name
)
master
=
master_nodes
[
0
].
public_dns_name
print
"
Logging into master
"
+
master
+
"
...
"
proxy_opt
=
""
if
opts
.
proxy_port
!=
None
:
proxy_opt
=
"
-D
"
+
opts
.
proxy_port
subprocess
.
check_call
(
"
ssh -o StrictHostKeyChecking=no -i %s %s ec2-user@%s
"
%
(
opts
.
identity_file
,
proxy_opt
,
master
),
shell
=
True
)
elif
action
==
"
get-master
"
:
(
master_nodes
,
slave_nodes
)
=
get_existing_cluster
(
conn
,
opts
,
cluster_name
)
print
master_nodes
[
0
].
public_dns_name
elif
action
==
"
stop
"
:
response
=
raw_input
(
"
Are you sure you want to stop the cluster
"
+
cluster_name
+
"
?
\n
DATA ON EPHEMERAL DISKS WILL BE LOST,
"
+
"
BUT THE CLUSTER WILL KEEP USING SPACE ON
\n
"
+
"
AMAZON EBS IF IT IS EBS-BACKED!!
\n
"
+
"
Stop cluster
"
+
cluster_name
+
"
(y/N):
"
)
if
response
==
"
y
"
:
(
master_nodes
,
slave_nodes
)
=
get_existing_cluster
(
conn
,
opts
,
cluster_name
)
print
"
Stopping master...
"
for
inst
in
master_nodes
:
if
inst
.
state
not
in
[
"
shutting-down
"
,
"
terminated
"
]:
inst
.
stop
()
print
"
Stopping slaves...
"
for
inst
in
slave_nodes
:
if
inst
.
state
not
in
[
"
shutting-down
"
,
"
terminated
"
]:
inst
.
stop
()
elif
action
==
"
start
"
:
(
master_nodes
,
slave_nodes
)
=
get_existing_cluster
(
conn
,
opts
,
cluster_name
)
print
"
Starting slaves...
"
for
inst
in
slave_nodes
:
if
inst
.
state
not
in
[
"
shutting-down
"
,
"
terminated
"
]:
inst
.
start
()
print
"
Starting master...
"
for
inst
in
master_nodes
:
if
inst
.
state
not
in
[
"
shutting-down
"
,
"
terminated
"
]:
inst
.
start
()
wait_for_cluster
(
conn
,
opts
.
wait
,
master_nodes
,
slave_nodes
)
setup_cluster
(
conn
,
master_nodes
,
slave_nodes
,
opts
,
False
)
else
:
print
>>
stderr
,
"
Invalid action: %s
"
%
action
sys
.
exit
(
1
)
if
__name__
==
"
__main__
"
:
logging
.
basicConfig
()
main
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment