From ee888f6b251c4f06f2edf15267d12e42e28fd22f Mon Sep 17 00:00:00 2001
From: Mark Hamstra <markhamstra@gmail.com>
Date: Thu, 5 Dec 2013 21:53:40 -0800
Subject: [PATCH] FutureAction result tests

---
 .../spark/rdd/AsyncRDDActionsSuite.scala      | 26 +++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index da032b17d9..0d4c10db8e 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.rdd
 
 import java.util.concurrent.Semaphore
 
+import scala.concurrent.{Await, TimeoutException}
+import scala.concurrent.duration.Duration
 import scala.concurrent.ExecutionContext.Implicits.global
 
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
@@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts
       sem.acquire(2)
     }
   }
+
+  /**
+   * Awaiting FutureAction results
+   */
+  test("FutureAction result, infinite wait") {
+    val f = sc.parallelize(1 to 100, 4)
+              .countAsync()
+    assert(Await.result(f, Duration.Inf) === 100)
+  }
+
+  test("FutureAction result, finite wait") {
+    val f = sc.parallelize(1 to 100, 4)
+              .countAsync()
+    assert(Await.result(f, Duration(30, "seconds")) === 100)
+  }
+
+  test("FutureAction result, timeout") {
+    val f = sc.parallelize(1 to 100, 4)
+              .mapPartitions(itr => { Thread.sleep(20); itr })
+              .countAsync()
+    intercept[TimeoutException] {
+      Await.result(f, Duration(20, "milliseconds"))
+    }
+  }
 }
-- 
GitLab