Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
1fd5ee32
Commit
1fd5ee32
authored
12 years ago
by
Charles Reiss
Browse files
Options
Downloads
Patches
Plain Diff
Code review changes: add sc.stop; style of multiline comments; parens on procedure calls.
parent
7f514587
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+47
-22
47 additions, 22 deletions
core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
with
47 additions
and
22 deletions
core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+
47
−
22
View file @
1fd5ee32
...
@@ -31,7 +31,7 @@ import spark.TaskEndReason
...
@@ -31,7 +31,7 @@ import spark.TaskEndReason
import
spark.
{
FetchFailed
,
Success
}
import
spark.
{
FetchFailed
,
Success
}
/**
/**
* Tests for DAGScheduler. These tests directly call the event processing functi
n
os in DAGScheduler
* Tests for DAGScheduler. These tests directly call the event processing functio
n
s in DAGScheduler
* rather than spawning an event loop thread as happens in the real code. They use EasyMock
* rather than spawning an event loop thread as happens in the real code. They use EasyMock
* to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are
* to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are
* submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead
* submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead
...
@@ -56,29 +56,34 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -56,29 +56,34 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
var
schedulerThread
:
Thread
=
null
var
schedulerThread
:
Thread
=
null
var
schedulerException
:
Throwable
=
null
var
schedulerException
:
Throwable
=
null
/** Set of EasyMock argument matchers that match a TaskSet for a given RDD.
/**
* Set of EasyMock argument matchers that match a TaskSet for a given RDD.
* We cache these so we do not create duplicate matchers for the same RDD.
* We cache these so we do not create duplicate matchers for the same RDD.
* This allows us to easily setup a sequence of expectations for task sets for
* This allows us to easily setup a sequence of expectations for task sets for
* that RDD.
* that RDD.
*/
*/
val
taskSetMatchers
=
new
HashMap
[
MyRDD
,
IArgumentMatcher
]
val
taskSetMatchers
=
new
HashMap
[
MyRDD
,
IArgumentMatcher
]
/** Set of cache locations to return from our mock BlockManagerMaster.
/**
* Set of cache locations to return from our mock BlockManagerMaster.
* Keys are (rdd ID, partition ID). Anything not present will return an empty
* Keys are (rdd ID, partition ID). Anything not present will return an empty
* list of cache locations silently.
* list of cache locations silently.
*/
*/
val
cacheLocations
=
new
HashMap
[(
Int
,
Int
)
,
Seq
[
BlockManagerId
]]
val
cacheLocations
=
new
HashMap
[(
Int
,
Int
)
,
Seq
[
BlockManagerId
]]
/** JobWaiter for the last JobSubmitted event we pushed. To keep tests (most of which
/**
* JobWaiter for the last JobSubmitted event we pushed. To keep tests (most of which
* will only submit one job) from needing to explicitly track it.
* will only submit one job) from needing to explicitly track it.
*/
*/
var
lastJobWaiter
:
JobWaiter
=
null
var
lastJobWaiter
:
JobWaiter
=
null
/** Tell EasyMockSugar what mock objects we want to be configured by expecting {...}
/**
* Tell EasyMockSugar what mock objects we want to be configured by expecting {...}
* and whenExecuting {...} */
* and whenExecuting {...} */
implicit
val
mocks
=
MockObjects
(
taskScheduler
,
blockManagerMaster
)
implicit
val
mocks
=
MockObjects
(
taskScheduler
,
blockManagerMaster
)
/** Utility function to reset mocks and set expectations on them. EasyMock wants mock objects
/**
* Utility function to reset mocks and set expectations on them. EasyMock wants mock objects
* to be reset after each time their expectations are set, and we tend to check mock object
* to be reset after each time their expectations are set, and we tend to check mock object
* calls over a single call to DAGScheduler.
* calls over a single call to DAGScheduler.
*
*
...
@@ -115,17 +120,21 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -115,17 +120,21 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
whenExecuting
{
whenExecuting
{
scheduler
.
stop
()
scheduler
.
stop
()
}
}
sc
.
stop
()
System
.
clearProperty
(
"spark.master.port"
)
System
.
clearProperty
(
"spark.master.port"
)
}
}
def
makeBlockManagerId
(
host
:
String
)
:
BlockManagerId
=
def
makeBlockManagerId
(
host
:
String
)
:
BlockManagerId
=
BlockManagerId
(
"exec-"
+
host
,
host
,
12345
)
BlockManagerId
(
"exec-"
+
host
,
host
,
12345
)
/** Type of RDD we use for testing. Note that we should never call the real RDD compute methods.
/**
* This is a pair RDD type so it can always be used in ShuffleDependencies. */
* Type of RDD we use for testing. Note that we should never call the real RDD compute methods.
* This is a pair RDD type so it can always be used in ShuffleDependencies.
*/
type
MyRDD
=
RDD
[(
Int
,
Int
)]
type
MyRDD
=
RDD
[(
Int
,
Int
)]
/** Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and
/**
* Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and
* preferredLocations (if any) that are passed to them. They are deliberately not executable
* preferredLocations (if any) that are passed to them. They are deliberately not executable
* so we can test that DAGScheduler does not try to execute RDDs locally.
* so we can test that DAGScheduler does not try to execute RDDs locally.
*/
*/
...
@@ -150,7 +159,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -150,7 +159,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}
}
}
}
/** EasyMock matcher method. For use as an argument matcher for a TaskSet whose first task
/**
* EasyMock matcher method. For use as an argument matcher for a TaskSet whose first task
* is from a particular RDD.
* is from a particular RDD.
*/
*/
def
taskSetForRdd
(
rdd
:
MyRDD
)
:
TaskSet
=
{
def
taskSetForRdd
(
rdd
:
MyRDD
)
:
TaskSet
=
{
...
@@ -172,7 +182,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -172,7 +182,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
return
null
return
null
}
}
/** Setup an EasyMock expectation to repsond to blockManagerMaster.getLocations() called from
/**
* Setup an EasyMock expectation to repsond to blockManagerMaster.getLocations() called from
* cacheLocations.
* cacheLocations.
*/
*/
def
expectGetLocations
()
:
Unit
=
{
def
expectGetLocations
()
:
Unit
=
{
...
@@ -197,7 +208,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -197,7 +208,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}).
anyTimes
()
}).
anyTimes
()
}
}
/** Process the supplied event as if it were the top of the DAGScheduler event queue, expecting
/**
* Process the supplied event as if it were the top of the DAGScheduler event queue, expecting
* the scheduler not to exit.
* the scheduler not to exit.
*
*
* After processing the event, submit waiting stages as is done on most iterations of the
* After processing the event, submit waiting stages as is done on most iterations of the
...
@@ -208,7 +220,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -208,7 +220,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
scheduler
.
submitWaitingStages
()
scheduler
.
submitWaitingStages
()
}
}
/** Expect a TaskSet for the specified RDD to be submitted to the TaskScheduler. Should be
/**
* Expect a TaskSet for the specified RDD to be submitted to the TaskScheduler. Should be
* called from a resetExpecting { ... } block.
* called from a resetExpecting { ... } block.
*
*
* Returns a easymock Capture that will contain the task set after the stage is submitted.
* Returns a easymock Capture that will contain the task set after the stage is submitted.
...
@@ -220,7 +233,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -220,7 +233,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
return
taskSetCapture
return
taskSetCapture
}
}
/** Expect the supplied code snippet to submit a stage for the specified RDD.
/**
* Expect the supplied code snippet to submit a stage for the specified RDD.
* Return the resulting TaskSet. First marks all the tasks are belonging to the
* Return the resulting TaskSet. First marks all the tasks are belonging to the
* current MapOutputTracker generation.
* current MapOutputTracker generation.
*/
*/
...
@@ -239,7 +253,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -239,7 +253,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
return
taskSet
return
taskSet
}
}
/** Send the given CompletionEvent messages for the tasks in the TaskSet. */
/**
* Send the given CompletionEvent messages for the tasks in the TaskSet.
*/
def
respondToTaskSet
(
taskSet
:
TaskSet
,
results
:
Seq
[(
TaskEndReason
,
Any
)])
{
def
respondToTaskSet
(
taskSet
:
TaskSet
,
results
:
Seq
[(
TaskEndReason
,
Any
)])
{
assert
(
taskSet
.
tasks
.
size
>=
results
.
size
)
assert
(
taskSet
.
tasks
.
size
>=
results
.
size
)
for
((
result
,
i
)
<-
results
.
zipWithIndex
)
{
for
((
result
,
i
)
<-
results
.
zipWithIndex
)
{
...
@@ -249,7 +265,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -249,7 +265,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}
}
}
}
/** Assert that the supplied TaskSet has exactly the given preferredLocations. */
/**
* Assert that the supplied TaskSet has exactly the given preferredLocations.
*/
def
expectTaskSetLocations
(
taskSet
:
TaskSet
,
locations
:
Seq
[
Seq
[
String
]])
{
def
expectTaskSetLocations
(
taskSet
:
TaskSet
,
locations
:
Seq
[
Seq
[
String
]])
{
assert
(
locations
.
size
===
taskSet
.
tasks
.
size
)
assert
(
locations
.
size
===
taskSet
.
tasks
.
size
)
for
((
expectLocs
,
taskLocs
)
<-
for
((
expectLocs
,
taskLocs
)
<-
...
@@ -258,7 +276,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -258,7 +276,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}
}
}
}
/** When we submit dummy Jobs, this is the compute function we supply. Except in a local test
/**
* When we submit dummy Jobs, this is the compute function we supply. Except in a local test
* below, we do not expect this function to ever be executed; instead, we will return results
* below, we do not expect this function to ever be executed; instead, we will return results
* directly through CompletionEvents.
* directly through CompletionEvents.
*/
*/
...
@@ -266,8 +285,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -266,8 +285,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
it
.
next
.
_1
.
asInstanceOf
[
Int
]
it
.
next
.
_1
.
asInstanceOf
[
Int
]
/** Start a job to compute the given RDD. Returns the JobWaiter that will
/**
* collect the result of the job via callbacks from DAGScheduler. */
* Start a job to compute the given RDD. Returns the JobWaiter that will
* collect the result of the job via callbacks from DAGScheduler.
*/
def
submitRdd
(
rdd
:
MyRDD
,
allowLocal
:
Boolean
=
false
)
:
JobWaiter
=
{
def
submitRdd
(
rdd
:
MyRDD
,
allowLocal
:
Boolean
=
false
)
:
JobWaiter
=
{
val
(
toSubmit
,
waiter
)
=
scheduler
.
prepareJob
[(
Int
,
Int
)
,
Int
](
val
(
toSubmit
,
waiter
)
=
scheduler
.
prepareJob
[(
Int
,
Int
)
,
Int
](
rdd
,
rdd
,
...
@@ -281,7 +302,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -281,7 +302,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
return
waiter
return
waiter
}
}
/** Assert that a job we started has failed. */
/**
* Assert that a job we started has failed.
*/
def
expectJobException
(
waiter
:
JobWaiter
=
lastJobWaiter
)
{
def
expectJobException
(
waiter
:
JobWaiter
=
lastJobWaiter
)
{
waiter
.
getResult
match
{
waiter
.
getResult
match
{
case
JobSucceeded
(
_
)
=>
fail
()
case
JobSucceeded
(
_
)
=>
fail
()
...
@@ -289,7 +312,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -289,7 +312,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}
}
}
}
/** Assert that a job we started has succeeded and has the given result. */
/**
* Assert that a job we started has succeeded and has the given result.
*/
def
expectJobResult
(
expected
:
Array
[
Int
],
waiter
:
JobWaiter
=
lastJobWaiter
)
{
def
expectJobResult
(
expected
:
Array
[
Int
],
waiter
:
JobWaiter
=
lastJobWaiter
)
{
waiter
.
getResult
match
{
waiter
.
getResult
match
{
case
JobSucceeded
(
answer
)
=>
case
JobSucceeded
(
answer
)
=>
...
@@ -500,7 +525,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
...
@@ -500,7 +525,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
))
))
}
}
val
recomputeOne
=
interceptStage
(
shuffleOneRdd
)
{
val
recomputeOne
=
interceptStage
(
shuffleOneRdd
)
{
scheduler
.
resubmitFailedStages
scheduler
.
resubmitFailedStages
()
}
}
val
recomputeTwo
=
interceptStage
(
shuffleTwoRdd
)
{
val
recomputeTwo
=
interceptStage
(
shuffleTwoRdd
)
{
respondToTaskSet
(
recomputeOne
,
List
(
respondToTaskSet
(
recomputeOne
,
List
(
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment