diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 9680c6f3e1475d7f1111a5461a5613aebbbc6177..4db7339e6716bc25e3035257f80854d30b9fc46e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -88,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return a new RDD by applying a function to all elements of this RDD.
    */
   def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
-    def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
     new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
   }
 
@@ -119,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
     import scala.collection.JavaConverters._
     def fn = (x: T) => f.apply(x).asScala
-    def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
     JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
   }
 
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 23ec6c3b311f0a78e58ed7f2b0a1387418a53f0f..8c573ac0d65e0847c4218880d8bdeaf6dbf7b7fc 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -387,18 +387,21 @@ public class JavaAPISuite implements Serializable {
         return 1.0 * x;
       }
     }).cache();
+    doubles.collect();
     JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
       @Override
       public Tuple2<Integer, Integer> call(Integer x) {
         return new Tuple2<Integer, Integer>(x, x);
       }
     }).cache();
+    pairs.collect();
     JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
       @Override
       public String call(Integer x) {
         return x.toString();
       }
     }).cache();
+    strings.collect();
   }
 
   @Test
@@ -962,4 +965,18 @@ public class JavaAPISuite implements Serializable {
     }
 
   }
+
+  @Test
+  public void collectAsMapWithIntArrayValues() {
+    // Regression test for SPARK-1040
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
+    JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
+      @Override
+      public Tuple2<Integer, int[]> call(Integer x) throws Exception {
+        return new Tuple2<Integer, int[]>(x, new int[] { x });
+      }
+    });
+    pairRDD.collect();  // Works fine
+    Map<Integer, int[]> map = pairRDD.collectAsMap();  // Used to crash with ClassCastException
+  }
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a493a8279f94218318be45b7bdc1ee372209dc64..64fe204cdf7a56a6afa273924f7f6f3ab0a297a6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -138,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /** Return a new DStream by applying a function to all elements of this DStream. */
   def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
-    def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
     new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
   }
 
@@ -159,7 +159,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
   def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
     import scala.collection.JavaConverters._
     def fn = (x: T) => f.apply(x).asScala
-    def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
     new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
   }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 79fa6a623d2908e52e20f35d75cc852899836488..62cfa0a229db14b9ceecb930b511fd92aa0f768f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -745,7 +745,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   override val classTag: ClassTag[(K, V)] =
-    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+    implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
 }
 
 object JavaPairDStream {