diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index 20a7a3aa8c122ba7696df735bb3309c21fd50691..edd0fc56f861abbc0603975d0cd12e9944c9d447 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -19,8 +19,6 @@ package org.apache.spark.network.netty;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.oio.OioEventLoopGroup;
 import io.netty.channel.socket.oio.OioSocketChannel;
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
index 666432474dc75616a3f3a586c55996f5acdc8635..a99af348ce78207d156d51855038c1e63b5f3b24 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -20,7 +20,6 @@ package org.apache.spark.network.netty;
 import java.net.InetSocketAddress;
 
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.oio.OioEventLoopGroup;
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
index c9cbce5624afc3813bbe1e86ef9c8d374d577cc8..2090efd3b9990ae59dd3e102fdce90f82b173ed0 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.api.java;
 
-import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.List;
 
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
index db34cd190ad3a583c73b0a1825f4118970855d03..ed92d31af59c304e307554b287ec5e658320ba81 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
@@ -18,8 +18,6 @@
 package org.apache.spark.api.java.function;
 
 
-import scala.runtime.AbstractFunction1;
-
 import java.io.Serializable;
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.java b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
index f9dae6ed34cc6cfc04a35247af0a00bb8968038a..e97116986f77ac09f2c64fe6f21f9eb67624fa28 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
@@ -19,7 +19,6 @@ package org.apache.spark.api.java.function;
 
 import scala.reflect.ClassTag;
 import scala.reflect.ClassTag$;
-import scala.runtime.AbstractFunction1;
 
 import java.io.Serializable;
 
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
index 1659bfc55206d835f78c073329d36f5fc447ad74..cf77bb6b738c0a91ccc747b3ca64d0bb87dae1f1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
@@ -19,7 +19,6 @@ package org.apache.spark.api.java.function;
 
 import scala.reflect.ClassTag;
 import scala.reflect.ClassTag$;
-import scala.runtime.AbstractFunction2;
 
 import java.io.Serializable;
 
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 5a5c9b62960235fa29bb1fedb18c08da11e30f42..fbd0cdabe06d7d3c3c196e4a1c8b75c19ab8f7b6 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -20,7 +20,6 @@ package org.apache.spark.api.java.function;
 import scala.Tuple2;
 import scala.reflect.ClassTag;
 import scala.reflect.ClassTag$;
-import scala.runtime.AbstractFunction1;
 
 import java.io.Serializable;
 
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
index 4c39f483e571a676c350af24b2e1631619c95f3b..f09559627dabfd64b88e84f20aad5fdefcfedb76 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
@@ -20,7 +20,6 @@ package org.apache.spark.api.java.function;
 import scala.Tuple2;
 import scala.reflect.ClassTag;
 import scala.reflect.ClassTag$;
-import scala.runtime.AbstractFunction1;
 
 import java.io.Serializable;
 
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 67d45723badd8b4327a558664ffa37abdd5cc061..f291266fcf17c1546f34e6e12bc66accc6836e24 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -64,7 +64,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
           startDaemon()
           new Socket(daemonHost, daemonPort)
         }
-        case e => throw e
+        case e: Throwable => throw e
       }
     }
   }
@@ -198,7 +198,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
           }
         }.start()
       } catch {
-        case e => {
+        case e: Throwable => {
           stopDaemon()
           throw e
         }
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index fcfea96ad60b88f5863d0323d9019f91052082ee..72f84c99fc8c06b2a65724f1ff70f90a026611bc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.deploy
 
 private[spark] object ExecutorState
-  extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
+  extends Enumeration {
 
   val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
index 7e804223cf48a6459a7c60caa50cff33e7675d89..39ef090ddf110224195999a5d3c8904fbc996f3d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.deploy.master
 
 private[spark] object ApplicationState
-  extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
+  extends Enumeration {
 
   type ApplicationState = Value
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
index b5ee6dca79fab36aeace009d436d7fd1ea69e481..fb3fe88d92a4fdee9db15b3babe86c40c29bd78e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.master
 
-private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
+private[spark] object WorkerState extends Enumeration {
   type WorkerState = Value
 
   val ALIVE, DEAD, DECOMMISSIONED = Value
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index bca6956a182a2c3dbd049e3cc857d6d412e464c4..fe2946bcbe12019a91b4607c188529da18274cd7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -19,13 +19,14 @@ package org.apache.spark.rdd
 
 import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
 import org.apache.spark.storage.BlockManager
+import scala.reflect.ClassTag
 
 private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
   val index = idx
 }
 
 private[spark]
-class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
+class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[String])
   extends RDD[T](sc, Nil) {
 
   @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index c5de6362a9aa7c75298b3e38029e5ab8a4e43388..98da35763b9d15c96d9cd1d84330c729aec759bc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -22,6 +22,7 @@ import java.io.{ObjectOutputStream, IOException}
 import scala.collection.mutable
 import scala.Some
 import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
 
 /**
  * Class that captures a coalesced RDD by essentially keeping track of parent partitions
@@ -68,7 +69,7 @@ case class CoalescedRDDPartition(
  * @param maxPartitions number of desired partitions in the coalesced RDD
  * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
  */
-class CoalescedRDD[T: ClassManifest](
+class CoalescedRDD[T: ClassTag](
                                       @transient var prev: RDD[T],
                                       maxPartitions: Int,
                                       balanceSlack: Double = 0.10)
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
index c8900d1a9346d45eaa51bf5127c6f0f755148dde..99c34c6cc5ed299a7c0cefe98c4fff109092633a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -18,12 +18,13 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
+import scala.reflect.ClassTag
 
 
 /**
  * An RDD that is empty, i.e. has no element in it.
  */
-class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
+class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
 
   override def getPartitions: Array[Partition] = Array.empty
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
index 5312dc0b593882e5868f9633e3c11c7ceb2efd7e..e74c83b90baa349f359790cb81ea511211ebe48d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
@@ -18,8 +18,9 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{OneToOneDependency, Partition, TaskContext}
+import scala.reflect.ClassTag
 
-private[spark] class FilteredRDD[T: ClassManifest](
+private[spark] class FilteredRDD[T: ClassTag](
     prev: RDD[T],
     f: T => Boolean)
   extends RDD[T](prev) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
index cbdf6d84c07062654f2bc1e42c04616433b00376..4d1878fc142ac78a5821a3747a53af634a9b6628 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
@@ -18,10 +18,11 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
 
 
 private[spark]
-class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
+class FlatMappedRDD[U: ClassTag, T: ClassTag](
     prev: RDD[T],
     f: T => TraversableOnce[U])
   extends RDD[U](prev) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
index 829545d7b0aff389f48ffac2e067b53625879ffd..1a694475f6975185fac2459c39e3719c4507b11d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
@@ -18,8 +18,9 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
 
-private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
+private[spark] class GlommedRDD[T: ClassTag](prev: RDD[T])
   extends RDD[Array[T]](prev) {
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index 203179c4ea823efb7d1b48cbf3e612f289729ae9..cdb5946b49366da715d71287c8c67c9c02799703 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -18,10 +18,11 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
 
 
 private[spark]
-class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
+class MapPartitionsRDD[U: ClassTag, T: ClassTag](
     prev: RDD[T],
     f: Iterator[T] => Iterator[U],
     preservesPartitioning: Boolean = false)
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
index 3ed833901073a442709cac851793c5f8557447cc..3cf22851ddb546d11521b996774cb28fb2fc5f1a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
 
 
 /**
@@ -26,7 +27,7 @@ import org.apache.spark.{Partition, TaskContext}
  * information such as the number of tuples in a partition.
  */
 private[spark]
-class MapPartitionsWithIndexRDD[U: ClassManifest, T: ClassManifest](
+class MapPartitionsWithIndexRDD[U: ClassTag, T: ClassTag](
     prev: RDD[T],
     f: (Int, Iterator[T]) => Iterator[U],
     preservesPartitioning: Boolean
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
index e8be1c4816e455f0529cc8c5afcdfbdb117f9815..eb3b19907d77469e91fdf89ddd23a1700185a5cd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
 
 private[spark]
-class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
+class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
   extends RDD[U](prev) {
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 697be8b997bbdfa2d93efb92477839b382f9b5b8..4a465840c6f7444cf0cfa02f0d3679cb807e5691 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{RangePartitioner, Logging}
+import scala.reflect.ClassTag
 
 /**
  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
@@ -25,9 +26,9 @@ import org.apache.spark.{RangePartitioner, Logging}
  * use these functions. They will work with any key type that has a `scala.math.Ordered`
  * implementation.
  */
-class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest,
-                          V: ClassManifest,
-                          P <: Product2[K, V] : ClassManifest](
+class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
+                          V: ClassTag,
+                          P <: Product2[K, V] : ClassTag](
     self: RDD[P])
   extends Logging with Serializable {
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index aed585e6a12e44b7430f88360cc43d80b0c0993b..c8e623081ae17744950efebe9410443d5d5e3b25 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -51,7 +51,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
  * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
  * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
  */
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
+class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   extends Logging
   with SparkHadoopMapReduceUtil
   with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index 9537152335052165fe63762549992d5c38a14e59..b7205865cf09434552049329c2989d19cb725e69 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{Dependency, Partitioner, SparkEnv, ShuffleDependency, Partition, TaskContext}
+import scala.reflect.ClassTag
 
 
 private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -32,7 +33,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
  * @tparam K the key class.
  * @tparam V the value class.
  */
-class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest](
+class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag](
     @transient var prev: RDD[P],
     part: Partitioner)
   extends RDD[P](prev.context, Nil) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 0ac3d7bcfdd2cebd8dbe5a4f58384fee0fb87dff..f61fde695704c2f14392a5b9437ccb99d70deb30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -461,7 +461,7 @@ private[spark] class ClusterTaskSetManager(
         case cnf: ClassNotFoundException =>
           val loader = Thread.currentThread().getContextClassLoader
           throw new SparkException("ClassNotFound with classloader: " + loader, cnf)
-        case ex => throw ex
+        case ex: Throwable => throw ex
       }
       // Mark finished and stop if we've finished all the tasks
       finished(index) = true
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala
index 34811389a02f7d27c6203a1ad6c77cbd90a03a6a..16013b320814e4b47730a5a12e0e396bd1c3117e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala
@@ -22,7 +22,7 @@ package org.apache.spark.scheduler.cluster
  *    to order tasks amongst a Schedulable's sub-queues
  *  "NONE" is used when the a Schedulable has no sub-queues.
  */
-object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
+object SchedulingMode extends Enumeration {
 
   type SchedulingMode = Value
   val FAIR,FIFO,NONE = Value
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala
index 5d4130e14a8ad1ab40f87111e56596c7ff217f0d..8d8d70861228adf64a3c5f796f6248abfdd30d1d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
 
 
 private[spark] object TaskLocality
-  extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY")
+  extends Enumeration
 {
   // process local is expected to be used ONLY within tasksetmanager for now.
   val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index c719a54a61978997ba8d7ceb1701e1e33becb899..adc6ca94ffc3c8375123b34515d7febf04b3394d 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -313,7 +313,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
           Thread.sleep(200)
         }
       } catch {
-        case _ => { Thread.sleep(10) }
+        case _: Throwable => { Thread.sleep(10) }
           // Do nothing. We might see exceptions because block manager
           // is racing this thread to remove entries from the driver.
       }
diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
index 46a2da172407f009f4895e5d4d7efa362b0bf398..768ca3850e7e79d32b324ddcd7c72abfad00d9ed 100644
--- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
@@ -37,7 +37,7 @@ class UnpersistSuite extends FunSuite with LocalSparkContext {
           Thread.sleep(200)
         }
       } catch {
-        case _ => { Thread.sleep(10) }
+        case _: Throwable => { Thread.sleep(10) }
           // Do nothing. We might see exceptions because block manager
           // is racing this thread to remove entries from the driver.
       }
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index c1df5e151e8acca7073a50209cfa4b5460795262..096023f47627dc8366bfbcf48570dd46972c3c4e 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -225,8 +225,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     // test that you get over 90% locality in each group
     val minLocality = coalesced2.partitions
       .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
-      .foldLeft(1.)((perc, loc) => math.min(perc,loc))
-    assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
+      .foldLeft(1.0)((perc, loc) => math.min(perc,loc))
+    assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.0).toInt + "%")
 
     // test that the groups are load balanced with 100 +/- 20 elements in each
     val maxImbalance = coalesced2.partitions
@@ -238,9 +238,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     val coalesced3 = data3.coalesce(numMachines*2)
     val minLocality2 = coalesced3.partitions
       .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
-      .foldLeft(1.)((perc, loc) => math.min(perc,loc))
+      .foldLeft(1.0)((perc, loc) => math.min(perc,loc))
     assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
-      (minLocality2*100.).toInt + "%")
+      (minLocality2*100.0).toInt + "%")
   }
 
   test("zipped RDDs") {
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 152f02921338a8ab974effdd211e1365f5f55741..407cd7ccfaee0c964f7e98a81ffc847e1244d603 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -123,7 +123,7 @@ public class JavaLogQuery {
     });
 
     List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect();
-    for (Tuple2 t : output) {
+    for (Tuple2<?,?> t : output) {
       System.out.println(t._1 + "\t" + t._2);
     }
     System.exit(0);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index c5603a639bdd9c1abf3127c7ebf859b362db4aac..89aed8f279654d87db2eacc879a3c560cbfe6db1 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -21,7 +21,6 @@ import scala.Tuple2;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -106,7 +105,7 @@ public class JavaPageRank {
 
     // Collects all URL ranks and dump them to console.
     List<Tuple2<String, Double>> output = ranks.collect();
-    for (Tuple2 tuple : output) {
+    for (Tuple2<?,?> tuple : output) {
         System.out.println(tuple._1 + " has rank: " + tuple._2 + ".");
     }
 
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 07d32ad659a74dd1bcfcdc808be0022646b8bd67..bd6383e13df7a6c75180b26487a0ce0c961e49c5 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -58,7 +58,7 @@ public class JavaWordCount {
     });
 
     List<Tuple2<String, Integer>> output = counts.collect();
-    for (Tuple2 tuple : output) {
+    for (Tuple2<?,?> tuple : output) {
       System.out.println(tuple._1 + ": " + tuple._2);
     }
     System.exit(0);
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
index 628cb892b686267c1996433faf324ab35b45ced4..45a0d237da314187ab2869a854583d6f330656f2 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
@@ -25,7 +25,6 @@ import org.apache.spark.mllib.recommendation.ALS;
 import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
 import org.apache.spark.mllib.recommendation.Rating;
 
-import java.io.Serializable;
 import java.util.Arrays;
 import java.util.StringTokenizer;
 
diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java
index 32d3934ac135a9ecb4aa2df50b03b88dcd3addbe..33b99f4bd3bcf72962e025599279b5d0e7e788f7 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java
@@ -77,7 +77,7 @@ public class JavaKMeansSuite implements Serializable {
 
   @Test
   public void runKMeansUsingStaticMethods() {
-    List<double[]> points = new ArrayList();
+    List<double[]> points = new ArrayList<double[]>();
     points.add(new double[]{1.0, 2.0, 6.0});
     points.add(new double[]{1.0, 3.0, 0.0});
     points.add(new double[]{1.0, 4.0, 6.0});
@@ -94,7 +94,7 @@ public class JavaKMeansSuite implements Serializable {
 
   @Test
   public void runKMeansUsingConstructor() {
-    List<double[]> points = new ArrayList();
+    List<double[]> points = new ArrayList<double[]>();
     points.add(new double[]{1.0, 2.0, 6.0});
     points.add(new double[]{1.0, 3.0, 0.0});
     points.add(new double[]{1.0, 4.0, 6.0});
diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
index 3323f6cee2b910d7e6f4fcf6d531b443a98e8b89..c474e0118807dd405926565b71f8ea734d5a51fe 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
@@ -20,8 +20,6 @@ package org.apache.spark.mllib.recommendation;
 import java.io.Serializable;
 import java.util.List;
 
-import scala.Tuple2;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
index 4eddc755b97f46e0e055917c3815363ae8f8b789..16c1567355850e53857d37643043f4c3d711ce8d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
@@ -21,9 +21,10 @@ import org.apache.spark.Partitioner
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.CoGroupedRDD
 import org.apache.spark.streaming.{Time, DStream, Duration}
+import scala.reflect.ClassTag
 
 private[streaming]
-class CoGroupedDStream[K : ClassManifest](
+class CoGroupedDStream[K : ClassTag](
     parents: Seq[DStream[(K, _)]],
     partitioner: Partitioner
   ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index a9a05c9981f7436af96dc15b28bd282002c66c19..f396c347581ce285d30ed43d6279637815a7181e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -19,11 +19,12 @@ package org.apache.spark.streaming.dstream
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Time, StreamingContext}
+import scala.reflect.ClassTag
 
 /**
  * An input stream that always returns the same RDD on each timestep. Useful for testing.
  */
-class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T])
+class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
   extends InputDStream[T](ssc_) {
 
   override def start() {}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
index 91ee2c1a36fa383a42a2c7ca940fe8a913ce974b..db2e0a4ceef0366ca0deefd5650df1ed0f336d94 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
 
 import org.apache.spark.streaming.{Duration, DStream, Time}
 import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
 
 private[streaming]
-class FilteredDStream[T: ClassManifest](
+class FilteredDStream[T: ClassTag](
     parent: DStream[T],
     filterFunc: T => Boolean
   ) extends DStream[T](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
index ca7d7ca49effd727cf04eb0629e13b6fe63b7de9..244dc3ee4fa143c8bde0bc08045c67545dc6929c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream
 import org.apache.spark.streaming.{Duration, DStream, Time}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
+import scala.reflect.ClassTag
 
 private[streaming]
-class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
     parent: DStream[(K, V)],
     flatMapValueFunc: V => TraversableOnce[U]
   ) extends DStream[(K, U)](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index b37966f9a79bd94da5460aee48f085eb1041f89b..336c4b7a92dc6c3754eb16436c34fa6ba4d2ea18 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
 
 import org.apache.spark.streaming.{Duration, DStream, Time}
 import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
 
 private[streaming]
-class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
+class FlatMappedDStream[T: ClassTag, U: ClassTag](
     parent: DStream[T],
     flatMapFunc: T => Traversable[U]
   ) extends DStream[U](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index e21bac460255c7e59416fc4d5f2f664654dc04a7..98b14cb224263778e3edfeb3aa1ad377f69f7bfc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Duration, DStream, Job, Time}
+import scala.reflect.ClassTag
 
 private[streaming]
-class ForEachDStream[T: ClassManifest] (
+class ForEachDStream[T: ClassTag] (
     parent: DStream[T],
     foreachFunc: (RDD[T], Time) => Unit
   ) extends DStream[Unit](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
index 4294b07d910f14f9f7bf10b4c2d834dc55fd3597..23136f44fa3103d76bfe13a6a4d9ba21706db9c1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
 
 import org.apache.spark.streaming.{Duration, DStream, Time}
 import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
 
 private[streaming]
-class GlommedDStream[T: ClassManifest](parent: DStream[T])
+class GlommedDStream[T: ClassTag](parent: DStream[T])
   extends DStream[Array[T]](parent.ssc) {
 
   override def dependencies = List(parent)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
index 5329601a6f949820c41650f1bd3f08e82b40065d..8a04060e5b6c11360fbcec5d02777aee7cf0753f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
 
 import org.apache.spark.streaming.{Duration, DStream, Time}
 import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
 
 private[streaming]
-class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
+class MapPartitionedDStream[T: ClassTag, U: ClassTag](
     parent: DStream[T],
     mapPartFunc: Iterator[T] => Iterator[U],
     preservePartitioning: Boolean
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
index 8290df90a2894d15d22daa3950da70fb1b01d89b..0ce364fd4632829d3b7f80945e27633626d5e346 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
@@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream
 import org.apache.spark.streaming.{Duration, DStream, Time}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
+import scala.reflect.ClassTag
 
 private[streaming]
-class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
     parent: DStream[(K, V)],
     mapValueFunc: V => U
   ) extends DStream[(K, U)](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
index b1682afea39244d72ed4f415619ce3475a842bd1..c0b7491d096cd64bc37d7b2d5ce97ba00feded48 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
 
 import org.apache.spark.streaming.{Duration, DStream, Time}
 import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
 
 private[streaming]
-class MappedDStream[T: ClassManifest, U: ClassManifest] (
+class MappedDStream[T: ClassTag, U: ClassTag] (
     parent: DStream[T],
     mapFunc: T => U
   ) extends DStream[U](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 15782f5c119054555d9b9d0548e49bd0e4d7c3d3..6f9477020a459141180c51421196ec70aa560a15 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.streaming.dstream
 
 import org.apache.spark.streaming.StreamingContext
+import scala.reflect.ClassTag
 
 private[streaming]
-class PluggableInputDStream[T: ClassManifest](
+class PluggableInputDStream[T: ClassTag](
   @transient ssc_ : StreamingContext,
   receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index 7d9f3521b1ce7d2362019320ebe60a02b6e425b9..97325f8ea3117d2e712036ee49c7b2d5b063e0db 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -19,13 +19,13 @@ package org.apache.spark.streaming.dstream
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.UnionRDD
-
 import scala.collection.mutable.Queue
 import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.streaming.{Time, StreamingContext}
+import scala.reflect.ClassTag
 
 private[streaming]
-class QueueInputDStream[T: ClassManifest](
+class QueueInputDStream[T: ClassTag](
     @transient ssc: StreamingContext,
     val queue: Queue[RDD[T]],
     oneAtATime: Boolean,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index a95e66d7615ce2a06a171abda978d4d829b557bf..e6e00220979981c2f5a5db69d8254e26add226dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -21,9 +21,10 @@ import org.apache.spark.Partitioner
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
 import org.apache.spark.streaming.{Duration, DStream, Time}
+import scala.reflect.ClassTag
 
 private[streaming]
-class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
     parent: DStream[(K,V)],
     createCombiner: V => C,
     mergeValue: (C, V) => C,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 60485adef9594124f4dcfdb4a91115c03dbe5a63..73e1ddf7a473205c5c25bed58b3edd0957069a64 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Duration, DStream, Time}
+import scala.reflect.ClassTag
 
 private[streaming]
-class TransformedDStream[T: ClassManifest, U: ClassManifest] (
+class TransformedDStream[T: ClassTag, U: ClassTag] (
     parent: DStream[T],
     transformFunc: (RDD[T], Time) => RDD[U]
   ) extends DStream[U](parent.ssc) {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 783b8dea31baa78b4ba6f54b8e0597e7b8c65733..076fb53fa1b546d110af5bbbd5e3da71864f0d10 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -21,34 +21,36 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
+
 import kafka.serializer.StringDecoder;
+
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+
 import scala.Tuple2;
+import twitter4j.Status;
+
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.dstream.SparkFlumeEvent;
 import org.apache.spark.streaming.JavaTestUtils;
 import org.apache.spark.streaming.JavaCheckpointTestUtils;
-import org.apache.spark.streaming.InputStreamsSuite;
 
 import java.io.*;
 import java.util.*;
 
 import akka.actor.Props;
 import akka.zeromq.Subscribe;
-import akka.util.ByteString;
 
 
 // The test suite itself is Serializable so that anonymous Function implementations can be
@@ -85,8 +87,8 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(3L),
         Arrays.asList(1L));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream count = stream.count();
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Long> count = stream.count();
     JavaTestUtils.attachTestOutputStream(count);
     List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
     assertOrderInvariantEquals(expected, result);
@@ -102,8 +104,8 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(5,5),
         Arrays.asList(9,4));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
         @Override
         public Integer call(String s) throws Exception {
           return s.length();
@@ -128,8 +130,8 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(7,8,9,4,5,6),
         Arrays.asList(7,8,9));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream windowed = stream.window(new Duration(2000));
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> windowed = stream.window(new Duration(2000));
     JavaTestUtils.attachTestOutputStream(windowed);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
 
@@ -152,8 +154,8 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
         Arrays.asList(13,14,15,16,17,18));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000));
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> windowed = stream.window(new Duration(4000), new Duration(2000));
     JavaTestUtils.attachTestOutputStream(windowed);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
 
@@ -170,8 +172,8 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList("giants"),
         Arrays.asList("yankees"));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream filtered = stream.filter(new Function<String, Boolean>() {
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() {
       @Override
       public Boolean call(String s) throws Exception {
         return s.contains("a");
@@ -193,8 +195,8 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(Arrays.asList("giants", "dodgers")),
         Arrays.asList(Arrays.asList("yankees", "red socks")));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream glommed = stream.glom();
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<List<String>> glommed = stream.glom();
     JavaTestUtils.attachTestOutputStream(glommed);
     List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
 
@@ -211,8 +213,8 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList("GIANTSDODGERS"),
         Arrays.asList("YANKEESRED SOCKS"));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
       @Override
       public Iterable<String> call(Iterator<String> in) {
         String out = "";
@@ -254,8 +256,8 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(15),
         Arrays.asList(24));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream reduced = stream.reduce(new IntegerSum());
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> reduced = stream.reduce(new IntegerSum());
     JavaTestUtils.attachTestOutputStream(reduced);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
 
@@ -275,8 +277,8 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(39),
         Arrays.asList(24));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(),
         new IntegerDifference(), new Duration(2000), new Duration(1000));
     JavaTestUtils.attachTestOutputStream(reducedWindowed);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -349,8 +351,8 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
         Arrays.asList("a","t","h","l","e","t","i","c","s"));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
       @Override
       public Iterable<String> call(String x) {
         return Lists.newArrayList(x.split("(?!^)"));
@@ -396,8 +398,8 @@ public class JavaAPISuite implements Serializable {
             new Tuple2<Integer, String>(9, "c"),
             new Tuple2<Integer, String>(9, "s")));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
       @Override
       public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
         List<Tuple2<Integer, String>> out = Lists.newArrayList();
@@ -430,10 +432,10 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(2,2,5,5),
         Arrays.asList(3,3,6,6));
 
-    JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
-    JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
+    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
+    JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
 
-    JavaDStream unioned = stream1.union(stream2);
+    JavaDStream<Integer> unioned = stream1.union(stream2);
     JavaTestUtils.attachTestOutputStream(unioned);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
 
@@ -444,7 +446,7 @@ public class JavaAPISuite implements Serializable {
    * Performs an order-invariant comparison of lists representing two RDD streams. This allows
    * us to account for ordering variation within individual RDD's which occurs during windowing.
    */
-  public static <T extends Comparable> void assertOrderInvariantEquals(
+  public static <T extends Comparable<T>> void assertOrderInvariantEquals(
       List<List<T>> expected, List<List<T>> actual) {
     for (List<T> list: expected) {
       Collections.sort(list);
@@ -467,11 +469,11 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
         Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = stream.map(
         new PairFunction<String, String, Integer>() {
           @Override
-          public Tuple2 call(String in) throws Exception {
+          public Tuple2<String, Integer> call(String in) throws Exception {
             return new Tuple2<String, Integer>(in, in.length());
           }
         });
@@ -1163,8 +1165,8 @@ public class JavaAPISuite implements Serializable {
     File tempDir = Files.createTempDir();
     ssc.checkpoint(tempDir.getAbsolutePath());
 
-    JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+    JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
       @Override
       public Integer call(String s) throws Exception {
         return s.length();
@@ -1220,20 +1222,20 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void testKafkaStream() {
     HashMap<String, Integer> topics = Maps.newHashMap();
-    JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
-    JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
+    JavaDStream<String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+    JavaDStream<String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
       StorageLevel.MEMORY_AND_DISK());
 
     HashMap<String, String> kafkaParams = Maps.newHashMap();
     kafkaParams.put("zk.connect","localhost:12345");
     kafkaParams.put("groupid","consumer-group");
-    JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
+    JavaDStream<String> test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
       StorageLevel.MEMORY_AND_DISK());
   }
 
   @Test
   public void testSocketTextStream() {
-    JavaDStream test = ssc.socketTextStream("localhost", 12345);
+    JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
   }
 
   @Test
@@ -1253,7 +1255,7 @@ public class JavaAPISuite implements Serializable {
       }
     }
 
-    JavaDStream test = ssc.socketStream(
+    JavaDStream<String> test = ssc.socketStream(
       "localhost",
       12345,
       new Converter(),
@@ -1262,39 +1264,39 @@ public class JavaAPISuite implements Serializable {
 
   @Test
   public void testTextFileStream() {
-    JavaDStream test = ssc.textFileStream("/tmp/foo");
+    JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
   }
 
   @Test
   public void testRawSocketStream() {
-    JavaDStream test = ssc.rawSocketStream("localhost", 12345);
+    JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
   }
 
   @Test
   public void testFlumeStream() {
-    JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
+    JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
   }
 
   @Test
   public void testFileStream() {
     JavaPairDStream<String, String> foo =
-      ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
+      ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
   }
 
   @Test
   public void testTwitterStream() {
     String[] filters = new String[] { "good", "bad", "ugly" };
-    JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
+    JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
   }
 
   @Test
   public void testActorStream() {
-    JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
+    JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
   }
 
   @Test
   public void testZeroMQStream() {
-    JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
+    JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
       @Override
       public Iterable<String> call(byte[][] b) throws Exception {
         return null;