diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 95fef23ee4f39af81a6b79291e5bccf1efdfad3b..127973b658190130861d676fb8ea26094760a6ad 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -86,6 +86,10 @@ object MimaExcludes {
             // SPARK-5270
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.api.java.JavaRDDLike.isEmpty")
+          ) ++ Seq(
+            // SPARK-5297 Java FileStream do not work with custom key/values
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
           )
 
         case v if v.startsWith("1.2") =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d8695b8e05962887a75884d392c86e991144eda6..9a2254bcdc1f7c11f7bdc34970679f1d310c4312 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,14 +17,15 @@
 
 package org.apache.spark.streaming.api.java
 
+import java.lang.{Boolean => JBoolean}
+import java.io.{Closeable, InputStream}
+import java.util.{List => JList, Map => JMap}
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
-import java.io.{Closeable, InputStream}
-import java.util.{List => JList, Map => JMap}
-
 import akka.actor.{Props, SupervisorStrategy}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
 import org.apache.spark.{SparkConf, SparkContext}
@@ -250,21 +251,53 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
    * Files must be written to the monitored directory by "moving" them from another
    * location within the same file system. File names starting with . are ignored.
    * @param directory HDFS directory to monitor for new file
+   * @param kClass class of key for reading HDFS file
+   * @param vClass class of value for reading HDFS file
+   * @param fClass class of input format for reading HDFS file
    * @tparam K Key type for reading HDFS file
    * @tparam V Value type for reading HDFS file
    * @tparam F Input format for reading HDFS file
    */
   def fileStream[K, V, F <: NewInputFormat[K, V]](
-      directory: String): JavaPairInputDStream[K, V] = {
-    implicit val cmk: ClassTag[K] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val cmv: ClassTag[V] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-    implicit val cmf: ClassTag[F] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
+      directory: String,
+      kClass: Class[K],
+      vClass: Class[V],
+      fClass: Class[F]): JavaPairInputDStream[K, V] = {
+    implicit val cmk: ClassTag[K] = ClassTag(kClass)
+    implicit val cmv: ClassTag[V] = ClassTag(vClass)
+    implicit val cmf: ClassTag[F] = ClassTag(fClass)
     ssc.fileStream[K, V, F](directory)
   }
 
+  /**
+   * Create an input stream that monitors a Hadoop-compatible filesystem
+   * for new files and reads them using the given key-value types and input format.
+   * Files must be written to the monitored directory by "moving" them from another
+   * location within the same file system. File names starting with . are ignored.
+   * @param directory HDFS directory to monitor for new file
+   * @param kClass class of key for reading HDFS file
+   * @param vClass class of value for reading HDFS file
+   * @param fClass class of input format for reading HDFS file
+   * @param filter Function to filter paths to process
+   * @param newFilesOnly Should process only new files and ignore existing files in the directory
+   * @tparam K Key type for reading HDFS file
+   * @tparam V Value type for reading HDFS file
+   * @tparam F Input format for reading HDFS file
+   */
+  def fileStream[K, V, F <: NewInputFormat[K, V]](
+      directory: String,
+      kClass: Class[K],
+      vClass: Class[V],
+      fClass: Class[F],
+      filter: JFunction[Path, JBoolean],
+      newFilesOnly: Boolean): JavaPairInputDStream[K, V] = {
+    implicit val cmk: ClassTag[K] = ClassTag(kClass)
+    implicit val cmv: ClassTag[V] = ClassTag(vClass)
+    implicit val cmf: ClassTag[F] = ClassTag(fClass)
+    def fn = (x: Path) => filter.call(x).booleanValue()
+    ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
+  }
+
   /**
    * Create an input stream with any arbitrary user implemented actor receiver.
    * @param props Props object defining creation of the actor
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 12cc0de7509d6c3dc7059209a9ecf86348cc21a0..d92e7fe899a09ebd8a7163d15bf4b9a557070b5e 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,13 +17,20 @@
 
 package org.apache.spark.streaming;
 
+import java.io.*;
+import java.lang.Iterable;
+import java.nio.charset.Charset;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import scala.Tuple2;
 
 import org.junit.Assert;
+import static org.junit.Assert.*;
 import org.junit.Test;
-import java.io.*;
-import java.util.*;
-import java.lang.Iterable;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
@@ -1743,13 +1750,66 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
       StorageLevel.MEMORY_ONLY());
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testTextFileStream() throws IOException {
+    File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"));
+    List<List<String>> expected = fileTestPrepare(testDir);
+
+    JavaDStream<String> input = ssc.textFileStream(testDir.toString());
+    JavaTestUtils.attachTestOutputStream(input);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
   @Test
-  public void testTextFileStream() {
-    JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
+  public void testFileStream() throws IOException {
+    File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"));
+    List<List<String>> expected = fileTestPrepare(testDir);
+
+    JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream(
+      testDir.toString(),
+      LongWritable.class,
+      Text.class,
+      TextInputFormat.class,
+      new Function<Path, Boolean>() {
+        @Override
+        public Boolean call(Path v1) throws Exception {
+          return Boolean.TRUE;
+        }
+      },
+      true);
+
+    JavaDStream<String> test = inputStream.map(
+      new Function<Tuple2<LongWritable, Text>, String>() {
+        @Override
+        public String call(Tuple2<LongWritable, Text> v1) throws Exception {
+          return v1._2().toString();
+        }
+    });
+
+    JavaTestUtils.attachTestOutputStream(test);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
+
+    assertOrderInvariantEquals(expected, result);
   }
 
   @Test
   public void testRawSocketStream() {
     JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
   }
+
+  private List<List<String>> fileTestPrepare(File testDir) throws IOException {
+    File existingFile = new File(testDir, "0");
+    Files.write("0\n", existingFile, Charset.forName("UTF-8"));
+    assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000);
+
+    List<List<String>> expected = Arrays.asList(
+      Arrays.asList("0")
+    );
+
+    return expected;
+  }
 }