diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 16c6fdbe5274db02dc340beabac61ead051dee61..eecfdd4222adf517b6fa708291cae02de9296e90 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    *
    * Load data from a flat binary file, assuming the length of each record is constant.
    *
+   * '''Note:''' We ensure that the byte array for each record in the resulting RDD
+   * has the provided record length.
+   *
    * @param path Directory to the input data files
    * @param recordLength The length at which to split the records
    * @return An RDD of data with values, represented as byte arrays
@@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       classOf[LongWritable],
       classOf[BytesWritable],
       conf=conf)
-    val data = br.map{ case (k, v) => v.getBytes}
+    val data = br.map { case (k, v) =>
+      val bytes = v.getBytes
+      assert(bytes.length == recordLength, "Byte array does not have correct length")
+      bytes
+    }
     data
   }
 
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index d48f3598e33b21fd7aa34d7acfac010458d1bec5..18aaae93b05f2e8aa0313c210c8bc102377d932d 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -21,7 +21,7 @@ from py4j.java_collections import ListConverter
 from py4j.java_gateway import java_import, JavaObject
 
 from pyspark import RDD, SparkConf
-from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer
+from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer
 from pyspark.context import SparkContext
 from pyspark.storagelevel import StorageLevel
 from pyspark.streaming.dstream import DStream
@@ -251,6 +251,20 @@ class StreamingContext(object):
         """
         return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
 
+    def binaryRecordsStream(self, directory, recordLength):
+        """
+        Create an input stream that monitors a Hadoop-compatible file system
+        for new files and reads them as flat binary files with records of
+        fixed length. Files must be written to the monitored directory by "moving"
+        them from another location within the same file system.
+        File names starting with . are ignored.
+
+        @param directory:       Directory to load data from
+        @param recordLength:    Length of each record in bytes
+        """
+        return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self,
+                       NoOpSerializer())
+
     def _check_serializers(self, rdds):
         # make sure they have same serializer
         if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1:
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index a8d876d0fa3b359014eb408d4f88d44ba37ca122..608f8e26473a6a2677267b0454d36efda16a6e89 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -21,6 +21,7 @@ import time
 import operator
 import unittest
 import tempfile
+import struct
 
 from pyspark.context import SparkConf, SparkContext, RDD
 from pyspark.streaming.context import StreamingContext
@@ -455,6 +456,20 @@ class StreamingContextTests(PySparkStreamingTestCase):
         self.wait_for(result, 2)
         self.assertEqual([range(10), range(10)], result)
 
+    def test_binary_records_stream(self):
+        d = tempfile.mkdtemp()
+        self.ssc = StreamingContext(self.sc, self.duration)
+        dstream = self.ssc.binaryRecordsStream(d, 10).map(
+            lambda v: struct.unpack("10b", str(v)))
+        result = self._collect(dstream, 2, block=False)
+        self.ssc.start()
+        for name in ('a', 'b'):
+            time.sleep(1)
+            with open(os.path.join(d, name), "wb") as f:
+                f.write(bytearray(range(10)))
+        self.wait_for(result, 2)
+        self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result))
+
     def test_union(self):
         input = [range(i + 1) for i in range(3)]
         dstream = self.ssc.queueStream(input)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8ef0787137845ce84ad88f6692b0391fe43bc8da..ddc435cf1a2e6748366d5606f412a8d55198d038 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -27,10 +27,12 @@ import scala.reflect.ClassTag
 import akka.actor.{Props, SupervisorStrategy}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.input.FixedLengthBinaryInputFormat
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream._
@@ -359,6 +361,30 @@ class StreamingContext private[streaming] (
     new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
   }
 
+  /**
+   * Create a input stream that monitors a Hadoop-compatible filesystem
+   * for new files and reads them using the given key-value types and input format.
+   * Files must be written to the monitored directory by "moving" them from another
+   * location within the same file system. File names starting with . are ignored.
+   * @param directory HDFS directory to monitor for new file
+   * @param filter Function to filter paths to process
+   * @param newFilesOnly Should process only new files and ignore existing files in the directory
+   * @param conf Hadoop configuration
+   * @tparam K Key type for reading HDFS file
+   * @tparam V Value type for reading HDFS file
+   * @tparam F Input format for reading HDFS file
+   */
+  def fileStream[
+    K: ClassTag,
+    V: ClassTag,
+    F <: NewInputFormat[K, V]: ClassTag
+  ] (directory: String,
+     filter: Path => Boolean,
+     newFilesOnly: Boolean,
+     conf: Configuration): InputDStream[(K, V)] = {
+    new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf))
+  }
+
   /**
    * Create a input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them as text files (using key as LongWritable, value
@@ -371,6 +397,37 @@ class StreamingContext private[streaming] (
     fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
   }
 
+  /**
+   * :: Experimental ::
+   *
+   * Create an input stream that monitors a Hadoop-compatible filesystem
+   * for new files and reads them as flat binary files, assuming a fixed length per record,
+   * generating one byte array per record. Files must be written to the monitored directory
+   * by "moving" them from another location within the same file system. File names
+   * starting with . are ignored.
+   *
+   * '''Note:''' We ensure that the byte array for each record in the
+   * resulting RDDs of the DStream has the provided record length.
+   *
+   * @param directory HDFS directory to monitor for new file
+   * @param recordLength length of each record in bytes
+   */
+  @Experimental
+  def binaryRecordsStream(
+      directory: String,
+      recordLength: Int): DStream[Array[Byte]] = {
+    val conf = sc_.hadoopConfiguration
+    conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
+    val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
+      directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
+    val data = br.map { case (k, v) =>
+      val bytes = v.getBytes
+      assert(bytes.length == recordLength, "Byte array does not have correct length")
+      bytes
+    }
+    data
+  }
+
   /**
    * Create an input stream from a queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 9a2254bcdc1f7c11f7bdc34970679f1d310c4312..0f7ae7a1c7de86b2692983c1221b0ed5c14bd25c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
 import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
 import org.apache.spark.rdd.RDD
@@ -177,7 +178,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
 
   /**
    * Create an input stream from network source hostname:port. Data is received using
-   * a TCP socket and the receive bytes it interepreted as object using the given
+   * a TCP socket and the receive bytes it interpreted as object using the given
    * converter.
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
@@ -209,6 +210,24 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
     ssc.textFileStream(directory)
   }
 
+  /**
+   * :: Experimental ::
+   *
+   * Create an input stream that monitors a Hadoop-compatible filesystem
+   * for new files and reads them as flat binary files with fixed record lengths,
+   * yielding byte arrays
+   *
+   * '''Note:''' We ensure that the byte array for each record in the
+   * resulting RDDs of the DStream has the provided record length.
+   *
+   * @param directory HDFS directory to monitor for new files
+   * @param recordLength The length at which to split the records
+   */
+  @Experimental
+  def binaryRecordsStream(directory: String, recordLength: Int): JavaDStream[Array[Byte]] = {
+    ssc.binaryRecordsStream(directory, recordLength)
+  }
+
   /**
    * Create an input stream from network source hostname:port, where data is received
    * as serialized blocks (serialized using the Spark's serializer) that can be directly
@@ -298,6 +317,37 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
     ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
   }
 
+  /**
+   * Create an input stream that monitors a Hadoop-compatible filesystem
+   * for new files and reads them using the given key-value types and input format.
+   * Files must be written to the monitored directory by "moving" them from another
+   * location within the same file system. File names starting with . are ignored.
+   * @param directory HDFS directory to monitor for new file
+   * @param kClass class of key for reading HDFS file
+   * @param vClass class of value for reading HDFS file
+   * @param fClass class of input format for reading HDFS file
+   * @param filter Function to filter paths to process
+   * @param newFilesOnly Should process only new files and ignore existing files in the directory
+   * @param conf Hadoop configuration
+   * @tparam K Key type for reading HDFS file
+   * @tparam V Value type for reading HDFS file
+   * @tparam F Input format for reading HDFS file
+   */
+  def fileStream[K, V, F <: NewInputFormat[K, V]](
+      directory: String,
+      kClass: Class[K],
+      vClass: Class[V],
+      fClass: Class[F],
+      filter: JFunction[Path, JBoolean],
+      newFilesOnly: Boolean,
+      conf: Configuration): JavaPairInputDStream[K, V] = {
+    implicit val cmk: ClassTag[K] = ClassTag(kClass)
+    implicit val cmv: ClassTag[V] = ClassTag(vClass)
+    implicit val cmf: ClassTag[F] = ClassTag(fClass)
+    def fn = (x: Path) => filter.call(x).booleanValue()
+    ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
+  }
+
   /**
    * Create an input stream with any arbitrary user implemented actor receiver.
    * @param props Props object defining creation of the actor
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index e7c5639a63499a4f3bb0df2b64789cb905fdc38b..6379b88527ec88a2c41e264ce934ddaba356abf4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap
 import scala.collection.mutable
 import scala.reflect.ClassTag
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
@@ -68,11 +69,13 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
  *   processing semantics are undefined.
  */
 private[streaming]
-class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
+class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
     @transient ssc_ : StreamingContext,
     directory: String,
     filter: Path => Boolean = FileInputDStream.defaultFilter,
-    newFilesOnly: Boolean = true)
+    newFilesOnly: Boolean = true,
+    conf: Option[Configuration] = None)
+    (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
   extends InputDStream[(K, V)](ssc_) {
 
   // This is a def so that it works during checkpoint recovery:
@@ -237,7 +240,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   /** Generate one RDD from an array of files */
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
     val fileRDDs = files.map(file =>{
-      val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)
+      val rdd = conf match {
+        case Some(config) => context.sparkContext.newAPIHadoopFile(
+          file,
+          fm.runtimeClass.asInstanceOf[Class[F]],
+          km.runtimeClass.asInstanceOf[Class[K]],
+          vm.runtimeClass.asInstanceOf[Class[V]],
+          config)
+        case None => context.sparkContext.newAPIHadoopFile[K, V, F](file)
+      }
       if (rdd.partitions.size == 0) {
         logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
           "files that have been \"moved\" to the directory assigned to the file stream. " +
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index bddf51e13042213eae5a985f0854706d42a14a69..01084a457db4f498be570f4df83a905f30c71b94 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -95,6 +95,57 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     }
   }
 
+  test("binary records stream") {
+    val testDir: File = null
+    try {
+      val batchDuration = Seconds(2)
+      val testDir = Utils.createTempDir()
+      // Create a file that exists before the StreamingContext is created:
+      val existingFile = new File(testDir, "0")
+      Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+      assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
+
+      // Set up the streaming context and input streams
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        // This `setTime` call ensures that the clock is past the creation time of `existingFile`
+        clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+        val batchCounter = new BatchCounter(ssc)
+        val fileStream = ssc.binaryRecordsStream(testDir.toString, 1)
+        val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]]
+          with SynchronizedBuffer[Seq[Array[Byte]]]
+        val outputStream = new TestOutputStream(fileStream, outputBuffer)
+        outputStream.register()
+        ssc.start()
+
+        // Advance the clock so that the files are created after StreamingContext starts, but
+        // not enough to trigger a batch
+        clock.addToTime(batchDuration.milliseconds / 2)
+
+        val input = Seq(1, 2, 3, 4, 5)
+        input.foreach { i =>
+          Thread.sleep(batchDuration.milliseconds)
+          val file = new File(testDir, i.toString)
+          Files.write(Array[Byte](i.toByte), file)
+          assert(file.setLastModified(clock.currentTime()))
+          assert(file.lastModified === clock.currentTime)
+          logInfo("Created file " + file)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          eventually(eventuallyTimeout) {
+            assert(batchCounter.getNumCompletedBatches === i)
+          }
+        }
+
+        val expectedOutput = input.map(i => i.toByte)
+        val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte)
+        assert(obtainedOutput === expectedOutput)
+      }
+    } finally {
+      if (testDir != null) Utils.deleteRecursively(testDir)
+    }
+  }
 
   test("file input stream - newFilesOnly = true") {
     testFileStream(newFilesOnly = true)