diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 75ea535f2f57be20c4e7ec9289a8da5e306162d4..e8f761eaa57991673575b1cc24e1213852b99d77 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -83,6 +83,15 @@ trait FutureAction[T] extends Future[T] {
    */
   @throws(classOf[Exception])
   def get(): T = Await.result(this, Duration.Inf)
+
+  /**
+   * Returns the job IDs run by the underlying async operation.
+   *
+   * This returns the current snapshot of the job list. Certain operations may run multiple
+   * jobs, so multiple calls to this method may return different lists.
+   */
+  def jobIds: Seq[Int]
+
 }
 
 
@@ -150,8 +159,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
     }
   }
 
-  /** Get the corresponding job id for this action. */
-  def jobId = jobWaiter.jobId
+  def jobIds = Seq(jobWaiter.jobId)
 }
 
 
@@ -171,6 +179,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
   // is cancelled before the action was even run (and thus we have no thread to interrupt).
   @volatile private var _cancelled: Boolean = false
 
+  @volatile private var jobs: Seq[Int] = Nil
+
   // A promise used to signal the future.
   private val p = promise[T]()
 
@@ -219,6 +229,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
       }
     }
 
+    this.jobs = jobs ++ job.jobIds
+
     // Wait for the job to complete. If the action is cancelled (with an interrupt),
     // cancel the job and stop the execution. This is not in a synchronized block because
     // Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
@@ -255,4 +267,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
   override def isCompleted: Boolean = p.isCompleted
 
   override def value: Option[Try[T]] = p.future.value
+
+  def jobIds = jobs
+
 }
diff --git a/core/src/test/scala/org/apache/spark/FutureActionSuite.scala b/core/src/test/scala/org/apache/spark/FutureActionSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..db9c25fc457a4a6159086e38d20978b30382d9a3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/FutureActionSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import org.apache.spark.SparkContext._
+
+class FutureActionSuite extends FunSuite with BeforeAndAfter with Matchers with LocalSparkContext {
+
+  before {
+    sc = new SparkContext("local", "FutureActionSuite")
+  }
+
+  test("simple async action") {
+    val rdd = sc.parallelize(1 to 10, 2)
+    val job = rdd.countAsync()
+    val res = Await.result(job, Duration.Inf)
+    res should be (10)
+    job.jobIds.size should be (1)
+  }
+
+  test("complex async action") {
+    val rdd = sc.parallelize(1 to 15, 3)
+    val job = rdd.takeAsync(10)
+    val res = Await.result(job, Duration.Inf)
+    res should be (1 to 10)
+    job.jobIds.size should be (2)
+  }
+
+}