diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 11ad4423997f221e184315e08a3161352e4d8c45..2db48f6f35f181e324c61558417dc0fc4a371d54 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -944,12 +944,11 @@ class SparkContext(config: SparkConf) extends Logging {
       classOf[LongWritable],
       classOf[BytesWritable],
       conf = conf)
-    val data = br.map { case (k, v) =>
-      val bytes = v.getBytes
+    br.map { case (k, v) =>
+      val bytes = v.copyBytes()
       assert(bytes.length == recordLength, "Byte array does not have correct length")
       bytes
     }
-    data
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index cc52bb1d23cd5d4ebcc8f4f03820a729a44a3a31..0276575d82ced4cb25d2e2aeda66e3a787bcb5cf 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.io._
+import java.nio.ByteBuffer
 import java.util.zip.GZIPOutputStream
 
 import scala.io.Source
@@ -29,7 +30,6 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
 
-import org.apache.spark.input.PortableDataStream
 import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
 import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
 import org.apache.spark.storage.StorageLevel
@@ -231,24 +231,26 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
   }
 
-  test("binary file input as byte array") {
-    sc = new SparkContext("local", "test")
+  private def writeBinaryData(testOutput: Array[Byte], testOutputCopies: Int): File = {
     val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
-    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
-    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
+    val file = new FileOutputStream(outFile)
     val channel = file.getChannel
-    channel.write(bbuf)
+    for (i <- 0 until testOutputCopies) {
+      // Shift values by i so that they're different in the output
+      val alteredOutput = testOutput.map(b => (b + i).toByte)
+      channel.write(ByteBuffer.wrap(alteredOutput))
+    }
     channel.close()
     file.close()
+    outFile
+  }
 
-    val inRdd = sc.binaryFiles(outFileName)
-    val (infile: String, indata: PortableDataStream) = inRdd.collect.head
-
+  test("binary file input as byte array") {
+    sc = new SparkContext("local", "test")
+    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+    val outFile = writeBinaryData(testOutput, 1)
+    val inRdd = sc.binaryFiles(outFile.getAbsolutePath)
+    val (infile, indata) = inRdd.collect().head
     // Make sure the name and array match
     assert(infile.contains(outFileName)) // a prefix may get added
     assert(indata.toArray === testOutput)
@@ -256,159 +258,55 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
 
   test("portabledatastream caching tests") {
     sc = new SparkContext("local", "test")
-    val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
     val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
-    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
-    val channel = file.getChannel
-    channel.write(bbuf)
-    channel.close()
-    file.close()
-
-    val inRdd = sc.binaryFiles(outFileName).cache()
-    inRdd.foreach{
-      curData: (String, PortableDataStream) =>
-       curData._2.toArray() // force the file to read
-    }
-    val mappedRdd = inRdd.map {
-      curData: (String, PortableDataStream) =>
-        (curData._2.getPath(), curData._2)
-    }
-    val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head
-
+    val outFile = writeBinaryData(testOutput, 1)
+    val inRdd = sc.binaryFiles(outFile.getAbsolutePath).cache()
+    inRdd.foreach(_._2.toArray()) // force the file to read
     // Try reading the output back as an object file
-
-    assert(indata.toArray === testOutput)
+    assert(inRdd.values.collect().head.toArray === testOutput)
   }
 
   test("portabledatastream persist disk storage") {
     sc = new SparkContext("local", "test")
-    val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
     val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
-    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
-    val channel = file.getChannel
-    channel.write(bbuf)
-    channel.close()
-    file.close()
-
-    val inRdd = sc.binaryFiles(outFileName).persist(StorageLevel.DISK_ONLY)
-    inRdd.foreach{
-      curData: (String, PortableDataStream) =>
-        curData._2.toArray() // force the file to read
-    }
-    val mappedRdd = inRdd.map {
-      curData: (String, PortableDataStream) =>
-        (curData._2.getPath(), curData._2)
-    }
-    val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head
-
-    // Try reading the output back as an object file
-
-    assert(indata.toArray === testOutput)
+    val outFile = writeBinaryData(testOutput, 1)
+    val inRdd = sc.binaryFiles(outFile.getAbsolutePath).persist(StorageLevel.DISK_ONLY)
+    inRdd.foreach(_._2.toArray()) // force the file to read
+    assert(inRdd.values.collect().head.toArray === testOutput)
   }
 
   test("portabledatastream flatmap tests") {
     sc = new SparkContext("local", "test")
-    val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
     val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+    val outFile = writeBinaryData(testOutput, 1)
+    val inRdd = sc.binaryFiles(outFile.getAbsolutePath)
     val numOfCopies = 3
-    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
-    val channel = file.getChannel
-    channel.write(bbuf)
-    channel.close()
-    file.close()
-
-    val inRdd = sc.binaryFiles(outFileName)
-    val mappedRdd = inRdd.map {
-      curData: (String, PortableDataStream) =>
-        (curData._2.getPath(), curData._2)
-    }
-    val copyRdd = mappedRdd.flatMap {
-      curData: (String, PortableDataStream) =>
-        for (i <- 1 to numOfCopies) yield (i, curData._2)
-    }
-
-    val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect()
-
-    // Try reading the output back as an object file
+    val copyRdd = inRdd.flatMap(curData => (0 until numOfCopies).map(_ => curData._2))
+    val copyArr = copyRdd.collect()
     assert(copyArr.length == numOfCopies)
-    copyArr.foreach{
-      cEntry: (Int, PortableDataStream) =>
-        assert(cEntry._2.toArray === testOutput)
+    for (i <- copyArr.indices) {
+      assert(copyArr(i).toArray === testOutput)
     }
-
   }
 
   test("fixed record length binary file as byte array") {
-    // a fixed length of 6 bytes
-
     sc = new SparkContext("local", "test")
-
-    val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
     val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
     val testOutputCopies = 10
-
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
-    val channel = file.getChannel
-    for(i <- 1 to testOutputCopies) {
-      val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-      channel.write(bbuf)
-    }
-    channel.close()
-    file.close()
-
-    val inRdd = sc.binaryRecords(outFileName, testOutput.length)
-    // make sure there are enough elements
+    val outFile = writeBinaryData(testOutput, testOutputCopies)
+    val inRdd = sc.binaryRecords(outFile.getAbsolutePath, testOutput.length)
     assert(inRdd.count == testOutputCopies)
-
-    // now just compare the first one
-    val indata: Array[Byte] = inRdd.collect.head
-    assert(indata === testOutput)
+    val inArr = inRdd.collect()
+    for (i <- inArr.indices) {
+      assert(inArr(i) === testOutput.map(b => (b + i).toByte))
+    }
   }
 
   test ("negative binary record length should raise an exception") {
-    // a fixed length of 6 bytes
     sc = new SparkContext("local", "test")
-
-    val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
-    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
-    val testOutputCopies = 10
-
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
-    val channel = file.getChannel
-    for(i <- 1 to testOutputCopies) {
-      val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-      channel.write(bbuf)
-    }
-    channel.close()
-    file.close()
-
-    val inRdd = sc.binaryRecords(outFileName, -1)
-
+    val outFile = writeBinaryData(Array[Byte](1, 2, 3, 4, 5, 6), 1)
     intercept[SparkException] {
-      inRdd.count
+      sc.binaryRecords(outFile.getAbsolutePath, -1).count()
     }
   }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 444261da8de6a31e24431a607ab826cac96fad40..4be02e7084f90dc5d308612d4237a6550d3bbd38 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -434,13 +434,12 @@ class StreamingContext private[streaming] (
     conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
     val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
       directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
-    val data = br.map { case (k, v) =>
-      val bytes = v.getBytes
+    br.map { case (k, v) =>
+      val bytes = v.copyBytes()
       require(bytes.length == recordLength, "Byte array does not have correct length. " +
         s"${bytes.length} did not equal recordLength: $recordLength")
       bytes
     }
-    data
   }
 
   /**
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6fb50a4052712c3057e78d8d670ed6040f7f5940..b5d36a36513ab349a52c293cd4cae3508908ae91 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -84,7 +84,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
           // Verify whether all the elements received are as expected
           // (whether the elements were received one in each interval is not verified)
-          val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
+          val output = outputQueue.asScala.flatten.toArray
           assert(output.length === expectedOutput.size)
           for (i <- output.indices) {
             assert(output(i) === expectedOutput(i))
@@ -155,14 +155,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         // not enough to trigger a batch
         clock.advance(batchDuration.milliseconds / 2)
 
-        val input = Seq(1, 2, 3, 4, 5)
-        input.foreach { i =>
+        val numCopies = 3
+        val input = Array[Byte](1, 2, 3, 4, 5)
+        for (i <- 0 until numCopies) {
           Thread.sleep(batchDuration.milliseconds)
           val file = new File(testDir, i.toString)
-          Files.write(Array[Byte](i.toByte), file)
+          Files.write(input.map(b => (b + i).toByte), file)
           assert(file.setLastModified(clock.getTimeMillis()))
           assert(file.lastModified === clock.getTimeMillis())
-          logInfo("Created file " + file)
+          logInfo(s"Created file $file")
           // Advance the clock after creating the file to avoid a race when
           // setting its modification time
           clock.advance(batchDuration.milliseconds)
@@ -170,10 +171,10 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
             assert(batchCounter.getNumCompletedBatches === i)
           }
         }
-
-        val expectedOutput = input.map(i => i.toByte)
-        val obtainedOutput = outputQueue.asScala.flatten.toList.map(i => i(0).toByte)
-        assert(obtainedOutput.toSeq === expectedOutput)
+        val obtainedOutput = outputQueue.asScala.map(_.flatten).toSeq
+        for (i <- obtainedOutput.indices) {
+          assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
+        }
       }
     } finally {
       if (testDir != null) Utils.deleteRecursively(testDir)
@@ -258,7 +259,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread)
     MultiThreadTestReceiver.haveAllThreadsFinished = false
     val outputQueue = new ConcurrentLinkedQueue[Seq[Long]]
-    def output: Iterable[Long] = outputQueue.asScala.flatMap(x => x)
+    def output: Iterable[Long] = outputQueue.asScala.flatten
 
     // set up the network stream using the test receiver
     withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>