diff --git a/core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java b/core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java
new file mode 100644
index 0000000000000000000000000000000000000000..0ad189633e427637343036a3ee05d8551b21aeb7
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+public interface JavaFutureAction<T> extends Future<T> {
+
+  /**
+   * Returns the job IDs run by the underlying async operation.
+   *
+   * This returns the current snapshot of the job list. Certain operations may run multiple
+   * jobs, so multiple calls to this method may return different lists.
+   */
+  List<Integer> jobIds();
+}
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index e8f761eaa57991673575b1cc24e1213852b99d77..d5c8f9d76c47688032d56fbb8919b241a22958d8 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -17,20 +17,21 @@
 
 package org.apache.spark
 
-import scala.concurrent._
-import scala.concurrent.duration.Duration
-import scala.util.Try
+import java.util.Collections
+import java.util.concurrent.TimeUnit
 
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaFutureAction
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}
 
+import scala.concurrent._
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Try}
+
 /**
- * :: Experimental ::
  * A future for the result of an action to support cancellation. This is an extension of the
  * Scala Future interface to support cancellation.
  */
-@Experimental
 trait FutureAction[T] extends Future[T] {
   // Note that we redefine methods of the Future trait here explicitly so we can specify a different
   // documentation (with reference to the word "action").
@@ -69,6 +70,11 @@ trait FutureAction[T] extends Future[T] {
    */
   override def isCompleted: Boolean
 
+  /**
+   * Returns whether the action has been cancelled.
+   */
+  def isCancelled: Boolean
+
   /**
    * The value of this Future.
    *
@@ -96,15 +102,16 @@ trait FutureAction[T] extends Future[T] {
 
 
 /**
- * :: Experimental ::
  * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
  * count, collect, reduce.
  */
-@Experimental
 class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
   extends FutureAction[T] {
 
+  @volatile private var _cancelled: Boolean = false
+
   override def cancel() {
+    _cancelled = true
     jobWaiter.cancel()
   }
 
@@ -143,6 +150,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
   }
 
   override def isCompleted: Boolean = jobWaiter.jobFinished
+  
+  override def isCancelled: Boolean = _cancelled
 
   override def value: Option[Try[T]] = {
     if (jobWaiter.jobFinished) {
@@ -164,12 +173,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
 
 
 /**
- * :: Experimental ::
  * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
  * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
  * action thread if it is being blocked by a job.
  */
-@Experimental
 class ComplexFutureAction[T] extends FutureAction[T] {
 
   // Pointer to the thread that is executing the action. It is set when the action is run.
@@ -222,7 +229,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
     // If the action hasn't been cancelled yet, submit the job. The check and the submitJob
     // command need to be in an atomic block.
     val job = this.synchronized {
-      if (!cancelled) {
+      if (!isCancelled) {
         rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc)
       } else {
         throw new SparkException("Action has been cancelled")
@@ -243,10 +250,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
     }
   }
 
-  /**
-   * Returns whether the promise has been cancelled.
-   */
-  def cancelled: Boolean = _cancelled
+  override def isCancelled: Boolean = _cancelled
 
   @throws(classOf[InterruptedException])
   @throws(classOf[scala.concurrent.TimeoutException])
@@ -271,3 +275,55 @@ class ComplexFutureAction[T] extends FutureAction[T] {
   def jobIds = jobs
 
 }
+
+private[spark]
+class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)
+  extends JavaFutureAction[T] {
+
+  import scala.collection.JavaConverters._
+
+  override def isCancelled: Boolean = futureAction.isCancelled
+
+  override def isDone: Boolean = {
+    // According to java.util.Future's Javadoc, this returns True if the task was completed,
+    // whether that completion was due to successful execution, an exception, or a cancellation.
+    futureAction.isCancelled || futureAction.isCompleted
+  }
+
+  override def jobIds(): java.util.List[java.lang.Integer] = {
+    Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
+  }
+
+  private def getImpl(timeout: Duration): T = {
+    // This will throw TimeoutException on timeout:
+    Await.ready(futureAction, timeout)
+    futureAction.value.get match {
+      case scala.util.Success(value) => converter(value)
+      case Failure(exception) =>
+        if (isCancelled) {
+          throw new CancellationException("Job cancelled").initCause(exception)
+        } else {
+          // java.util.Future.get() wraps exceptions in ExecutionException
+          throw new ExecutionException("Exception thrown by job", exception)
+        }
+    }
+  }
+
+  override def get(): T = getImpl(Duration.Inf)
+
+  override def get(timeout: Long, unit: TimeUnit): T =
+    getImpl(Duration.fromNanos(unit.toNanos(timeout)))
+
+  override def cancel(mayInterruptIfRunning: Boolean): Boolean = synchronized {
+    if (isDone) {
+      // According to java.util.Future's Javadoc, this should return false if the task is completed.
+      false
+    } else {
+      // We're limited in terms of the semantics we can provide here; our cancellation is
+      // asynchronous and doesn't provide a mechanism to not cancel if the job is running.
+      futureAction.cancel()
+      true
+    }
+  }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index c74439948334965beee5d3b5ee2c0b2076cf3861..efb8978f7ce1290dd588192ec0ce7621954bea8a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -21,12 +21,14 @@ import java.util.{Comparator, List => JList, Iterator => JIterator}
 import java.lang.{Iterable => JIterable, Long => JLong}
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
 
-import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
+import org.apache.spark._
+import org.apache.spark.SparkContext._
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
@@ -294,8 +296,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Applies a function f to all elements of this RDD.
    */
   def foreach(f: VoidFunction[T]) {
-    val cleanF = rdd.context.clean((x: T) => f.call(x))
-    rdd.foreach(cleanF)
+    rdd.foreach(x => f.call(x))
   }
 
   /**
@@ -576,16 +577,44 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def name(): String = rdd.name
 
   /**
-   * :: Experimental ::
-   * The asynchronous version of the foreach action.
-   *
-   * @param f the function to apply to all the elements of the RDD
-   * @return a FutureAction for the action
+   * The asynchronous version of `count`, which returns a
+   * future for counting the number of elements in this RDD.
    */
-  @Experimental
-  def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
-    import org.apache.spark.SparkContext._
-    rdd.foreachAsync(x => f.call(x))
+  def countAsync(): JavaFutureAction[JLong] = {
+    new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf)
+  }
+
+  /**
+   * The asynchronous version of `collect`, which returns a future for
+   * retrieving an array containing all of the elements in this RDD.
+   */
+  def collectAsync(): JavaFutureAction[JList[T]] = {
+    new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava)
+  }
+
+  /**
+   * The asynchronous version of the `take` action, which returns a
+   * future for retrieving the first `num` elements of this RDD.
+   */
+  def takeAsync(num: Int): JavaFutureAction[JList[T]] = {
+    new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava)
   }
 
+  /**
+   * The asynchronous version of the `foreach` action, which
+   * applies a function f to all the elements of this RDD.
+   */
+  def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = {
+    new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)),
+      { x => null.asInstanceOf[Void] })
+  }
+
+  /**
+   * The asynchronous version of the `foreachPartition` action, which
+   * applies a function f to each partition of this RDD.
+   */
+  def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
+    new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)),
+      { x => null.asInstanceOf[Void] })
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ede5568493cc0e7ce7b9f87dc7b1b233b77f6fa2..9f9f10b7ebc3a18205c6eb4bf83b79c93f037cc4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -24,14 +24,11 @@ import scala.concurrent.ExecutionContext.Implicits.global
 import scala.reflect.ClassTag
 
 import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
-import org.apache.spark.annotation.Experimental
 
 /**
- * :: Experimental ::
  * A set of asynchronous RDD actions available through an implicit conversion.
  * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
  */
-@Experimental
 class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {
 
   /**
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index b8fa822ae4bd8f43180e991635555240c32203be..3190148fb5f4314ee4348de48ed3ab0aaca8e872 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -20,6 +20,7 @@ package org.apache.spark;
 import java.io.*;
 import java.net.URI;
 import java.util.*;
+import java.util.concurrent.*;
 
 import scala.Tuple2;
 import scala.Tuple3;
@@ -29,6 +30,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.base.Throwables;
 import com.google.common.base.Optional;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
@@ -43,10 +45,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.spark.api.java.JavaDoubleRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.*;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.executor.TaskMetrics;
 import org.apache.spark.partial.BoundedDouble;
@@ -1308,6 +1307,92 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(data.size(), collected.length);
   }
 
+  private static final class BuggyMapFunction<T> implements Function<T, T> {
+
+    @Override
+    public T call(T x) throws Exception {
+      throw new IllegalStateException("Custom exception!");
+    }
+  }
+
+  @Test
+  public void collectAsync() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<List<Integer>> future = rdd.collectAsync();
+    List<Integer> result = future.get();
+    Assert.assertEquals(data, result);
+    Assert.assertFalse(future.isCancelled());
+    Assert.assertTrue(future.isDone());
+    Assert.assertEquals(1, future.jobIds().size());
+  }
+
+  @Test
+  public void foreachAsync() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<Void> future = rdd.foreachAsync(
+        new VoidFunction<Integer>() {
+          @Override
+          public void call(Integer integer) throws Exception {
+            // intentionally left blank.
+          }
+        }
+    );
+    future.get();
+    Assert.assertFalse(future.isCancelled());
+    Assert.assertTrue(future.isDone());
+    Assert.assertEquals(1, future.jobIds().size());
+  }
+
+  @Test
+  public void countAsync() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<Long> future = rdd.countAsync();
+    long count = future.get();
+    Assert.assertEquals(data.size(), count);
+    Assert.assertFalse(future.isCancelled());
+    Assert.assertTrue(future.isDone());
+    Assert.assertEquals(1, future.jobIds().size());
+  }
+
+  @Test
+  public void testAsyncActionCancellation() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<Void> future = rdd.foreachAsync(new VoidFunction<Integer>() {
+      @Override
+      public void call(Integer integer) throws Exception {
+        Thread.sleep(10000);  // To ensure that the job won't finish before it's cancelled.
+      }
+    });
+    future.cancel(true);
+    Assert.assertTrue(future.isCancelled());
+    Assert.assertTrue(future.isDone());
+    try {
+      future.get(2000, TimeUnit.MILLISECONDS);
+      Assert.fail("Expected future.get() for cancelled job to throw CancellationException");
+    } catch (CancellationException ignored) {
+      // pass
+    }
+  }
+
+  @Test
+  public void testAsyncActionErrorWrapping() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<Integer>()).countAsync();
+    try {
+      future.get(2, TimeUnit.SECONDS);
+      Assert.fail("Expected future.get() for failed job to throw ExcecutionException");
+    } catch (ExecutionException ee) {
+      Assert.assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!"));
+    }
+    Assert.assertTrue(future.isDone());
+  }
+
+
   /**
    * Test for SPARK-3647. This test needs to use the maven-built assembly to trigger the issue,
    * since that's the only artifact where Guava classes have been relocated.
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 350aad47735e4203a0ac6e2e054b536fbed11434..c58666af84f24ffa6b3e3ec3fc6e83e24d0b17c7 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,7 +54,18 @@ object MimaExcludes {
             // TaskContext was promoted to Abstract class
             ProblemFilters.exclude[AbstractClassProblem](
               "org.apache.spark.TaskContext")
-
+          ) ++ Seq(
+            // Adding new methods to the JavaRDDLike trait:
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaRDDLike.takeAsync"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaRDDLike.foreachPartitionAsync"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaRDDLike.countAsync"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaRDDLike.collectAsync")
           )
 
         case v if v.startsWith("1.1") =>