Skip to content
Snippets Groups Projects
Commit 07804987 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #232 from markhamstra/FiniteWait

jobWaiter.synchronized before jobWaiter.wait

...else ``IllegalMonitorStateException`` in ``SimpleFutureAction#ready``.
parents 5d460253 ee888f6b
No related branches found
No related tags found
No related merge requests found
...@@ -99,7 +99,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: ...@@ -99,7 +99,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = {
if (!atMost.isFinite()) { if (!atMost.isFinite()) {
awaitResult() awaitResult()
} else { } else jobWaiter.synchronized {
val finishTime = System.currentTimeMillis() + atMost.toMillis val finishTime = System.currentTimeMillis() + atMost.toMillis
while (!isCompleted) { while (!isCompleted) {
val time = System.currentTimeMillis() val time = System.currentTimeMillis()
......
...@@ -31,6 +31,7 @@ private[spark] class JobWaiter[T]( ...@@ -31,6 +31,7 @@ private[spark] class JobWaiter[T](
private var finishedTasks = 0 private var finishedTasks = 0
// Is the job as a whole finished (succeeded or failed)? // Is the job as a whole finished (succeeded or failed)?
@volatile
private var _jobFinished = totalTasks == 0 private var _jobFinished = totalTasks == 0
def jobFinished = _jobFinished def jobFinished = _jobFinished
......
...@@ -19,6 +19,8 @@ package org.apache.spark.rdd ...@@ -19,6 +19,8 @@ package org.apache.spark.rdd
import java.util.concurrent.Semaphore import java.util.concurrent.Semaphore
import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global
import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.scalatest.{BeforeAndAfterAll, FunSuite}
...@@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts ...@@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts
sem.acquire(2) sem.acquire(2)
} }
} }
/**
* Awaiting FutureAction results
*/
test("FutureAction result, infinite wait") {
val f = sc.parallelize(1 to 100, 4)
.countAsync()
assert(Await.result(f, Duration.Inf) === 100)
}
test("FutureAction result, finite wait") {
val f = sc.parallelize(1 to 100, 4)
.countAsync()
assert(Await.result(f, Duration(30, "seconds")) === 100)
}
test("FutureAction result, timeout") {
val f = sc.parallelize(1 to 100, 4)
.mapPartitions(itr => { Thread.sleep(20); itr })
.countAsync()
intercept[TimeoutException] {
Await.result(f, Duration(20, "milliseconds"))
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment