diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 4ad2bdf8a829690971a5cf7e58e1a9e271e69410..b35d9032f1d63bc5749de407fc32412c47d7f065 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -99,18 +99,22 @@ class JavaStreamingContext(val ssc: StreamingContext) {
 
   /**
    * Create an input stream that pulls messages form a Kafka Broker.
+   * @param typeClass Type of RDD
+   * @param decoderClass Type of kafka decoder
    * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
    * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
    * in its own thread.
    * @param storageLevel RDD storage level. Defaults to memory-only
    */
-  def kafkaStream[T, D <: kafka.serializer.Decoder[_]: Manifest](
+  def kafkaStream[T, D <: kafka.serializer.Decoder[_]](
+    typeClass: Class[T],
+    decoderClass: Class[D],
     kafkaParams: JMap[String, String],
     topics: JMap[String, JInt],
     storageLevel: StorageLevel)
   : JavaDStream[T] = {
-    implicit val cmt: ClassManifest[T] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]]
     ssc.kafkaStream[T, D](
       kafkaParams.toMap,
       Map(topics.mapValues(_.intValue()).toSeq: _*),
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 350d0888a3510c8869f56673a066747d32a9746a..e5fdbe1b7af75ad92a057eed6c2008a10c32b2fd 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -1206,6 +1206,12 @@ public class JavaAPISuite implements Serializable {
     JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
     JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
       StorageLevel.MEMORY_AND_DISK());
+
+    HashMap<String, String> kafkaParams = Maps.newHashMap();
+    kafkaParams.put("zk.connect","localhost:12345");
+    kafkaParams.put("groupid","consumer-group");
+    JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
+      StorageLevel.MEMORY_AND_DISK());
   }
 
   @Test