Skip to content
Snippets Groups Projects
Commit e2047d39 authored by Reynold Xin's avatar Reynold Xin
Browse files

Making takeAsync and collectAsync deterministic.

parent 09f76092
No related branches found
No related tags found
No related merge requests found
...@@ -177,10 +177,6 @@ class CancellablePromise[T] extends FutureAction[T] with Promise[T] { ...@@ -177,10 +177,6 @@ class CancellablePromise[T] extends FutureAction[T] with Promise[T] {
def run(func: => T)(implicit executor: ExecutionContext): Unit = scala.concurrent.future { def run(func: => T)(implicit executor: ExecutionContext): Unit = scala.concurrent.future {
thread = Thread.currentThread thread = Thread.currentThread
try { try {
if (cancelled) {
// This action has been cancelled before this thread even started running.
this.failure(new SparkException("action cancelled"))
}
this.success(func) this.success(func)
} catch { } catch {
case e: Exception => this.failure(e) case e: Exception => this.failure(e)
......
...@@ -54,9 +54,9 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with ...@@ -54,9 +54,9 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with
* Return a future for retrieving all elements of this RDD. * Return a future for retrieving all elements of this RDD.
*/ */
def collectAsync(): FutureAction[Seq[T]] = { def collectAsync(): FutureAction[Seq[T]] = {
val results = new ArrayBuffer[T] val results = new Array[Array[T]](self.partitions.size)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size), self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
(index, data) => results ++= data, results) (index, data) => results(index) = data, results.flatten.toSeq)
} }
/** /**
...@@ -66,10 +66,10 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with ...@@ -66,10 +66,10 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with
val promise = new CancellablePromise[Seq[T]] val promise = new CancellablePromise[Seq[T]]
promise.run { promise.run {
val buf = new ArrayBuffer[T](num) val results = new ArrayBuffer[T](num)
val totalParts = self.partitions.length val totalParts = self.partitions.length
var partsScanned = 0 var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) { while (results.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be // The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob. // greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1 var numPartsToTry = 1
...@@ -77,26 +77,28 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with ...@@ -77,26 +77,28 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with
// If we didn't find any rows after the first iteration, just try all partitions next. // If we didn't find any rows after the first iteration, just try all partitions next.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it // Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%. // by 50%.
if (buf.size == 0) { if (results.size == 0) {
numPartsToTry = totalParts - 1 numPartsToTry = totalParts - 1
} else { } else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt numPartsToTry = (1.5 * num * partsScanned / results.size).toInt
} }
} }
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions
val left = num - buf.size val left = num - results.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val buf = new Array[Array[T]](p.size)
promise.runJob(self, promise.runJob(self,
(it: Iterator[T]) => it.take(left).toArray, (it: Iterator[T]) => it.take(left).toArray,
p, p,
(index: Int, data: Array[T]) => buf ++= data.take(num - buf.size), (index: Int, data: Array[T]) => buf(index) = data,
Unit) Unit)
buf.foreach(results ++= _.take(num - results.size))
partsScanned += numPartsToTry partsScanned += numPartsToTry
} }
buf.toSeq results.toSeq
} }
promise.future promise.future
......
...@@ -53,8 +53,7 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll { ...@@ -53,8 +53,7 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll {
test("collectAsync") { test("collectAsync") {
assert(zeroPartRdd.collectAsync().get() === Seq.empty) assert(zeroPartRdd.collectAsync().get() === Seq.empty)
// Note that we sort the collected output because the order is indeterministic. val collected = sc.parallelize(1 to 1000, 3).collectAsync().get()
val collected = sc.parallelize(1 to 1000, 3).collectAsync().get().sorted
assert(collected === (1 to 1000)) assert(collected === (1 to 1000))
} }
...@@ -80,10 +79,9 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll { ...@@ -80,10 +79,9 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll {
test("takeAsync") { test("takeAsync") {
def testTake(rdd: RDD[Int], input: Seq[Int], num: Int) { def testTake(rdd: RDD[Int], input: Seq[Int], num: Int) {
// Note that we sort the collected output because the order is indeterministic. val expected = input.take(num)
val expected = input.take(num).size val saw = rdd.takeAsync(num).get()
val saw = rdd.takeAsync(num).get().size assert(saw == expected, "incorrect result for rdd with %d partitions (expected %s, saw %s)"
assert(saw == expected, "incorrect result for rdd with %d partitions (expected %d, saw %d)"
.format(rdd.partitions.size, expected, saw)) .format(rdd.partitions.size, expected, saw))
} }
val input = Range(1, 1000) val input = Range(1, 1000)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment