diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 5c79c6905801c82f5e19e21b94450ca6db0204a3..8577803743c8e06614566327017b109c4efe2f93 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.{NullWritable, Text}
 import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
 
+import org.apache.spark.TaskContext
 import org.apache.spark.ml.feature.LabeledPoint
 import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
 import org.apache.spark.mllib.util.MLUtils
@@ -159,8 +160,10 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
 
     (file: PartitionedFile) => {
-      val points =
-        new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+      val linesReader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
+
+      val points = linesReader
           .map(_.toString.trim)
           .filterNot(line => line.isEmpty || line.startsWith("#"))
           .map { line =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
index 18f9b55895a645ad5ce4f7f7e53bcd9681f47c94..83cf26c63a175614396fdee44607a57fb1ef82b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.Closeable
 import java.net.URI
 
 import org.apache.hadoop.conf.Configuration
@@ -30,7 +31,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
  * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines
  * in that file.
  */
-class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) extends Iterator[Text] {
+class HadoopFileLinesReader(
+    file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable {
   private val iterator = {
     val fileSplit = new FileSplit(
       new Path(new URI(file.filePath)),
@@ -48,4 +50,6 @@ class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) extends
   override def hasNext: Boolean = iterator.hasNext
 
   override def next(): Text = iterator.next()
+
+  override def close(): Unit = iterator.close()
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
index f03ae94d55838ff5e45f96505dec5af42812f0ba..938af25a96844678dc163d99c8980fa6b50bb63b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.Closeable
+
 import org.apache.hadoop.mapreduce.RecordReader
 
 import org.apache.spark.sql.catalyst.InternalRow
@@ -27,7 +29,8 @@ import org.apache.spark.sql.catalyst.InternalRow
  * Note that this returns [[Object]]s instead of [[InternalRow]] because we rely on erasure to pass
  * column batches by pretending they are rows.
  */
-class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] {
+class RecordReaderIterator[T](
+    private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable {
   private[this] var havePair = false
   private[this] var finished = false
 
@@ -38,7 +41,7 @@ class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T]
         // Close and release the reader here; close() will also be called when the task
         // completes, but for tasks that read from many files, it helps to release the
         // resources early.
-        rowReader.close()
+        close()
       }
       havePair = !finished
     }
@@ -52,4 +55,18 @@ class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T]
     havePair = false
     rowReader.getCurrentValue
   }
+
+  override def close(): Unit = {
+    if (rowReader != null) {
+      try {
+        // Close the reader and release it. Note: it's very important that we don't close the
+        // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
+        // older Hadoop 2.x releases. That bug can lead to non-deterministic corruption issues
+        // when reading compressed input.
+        rowReader.close()
+      } finally {
+        rowReader = null
+      }
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 9a118fe5a273d83b656c9558e5ad7e303ce64aab..9610746a81ef73549c49b3532484da2d7fe564e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce._
 
+import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -112,7 +113,9 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
     (file: PartitionedFile) => {
       val lineIterator = {
         val conf = broadcastedHadoopConf.value.value
-        new HadoopFileLinesReader(file, conf).map { line =>
+        val linesReader = new HadoopFileLinesReader(file, conf)
+        Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
+        linesReader.map { line =>
           new String(line.getBytes, 0, line.getLength, csvOptions.charset)
         }
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 7421314df7aa5c3d84767ebf011b6e8261965307..6882a6cdcac26eb3ebe11a945bfbb1ee6da2b877 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
 
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -104,7 +105,9 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
       .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
 
     (file: PartitionedFile) => {
-      val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString)
+      val linesReader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
+      val lines = linesReader.map(_.toString)
       val parser = new JacksonParser(requiredSchema, columnNameOfCorruptRecord, parsedOptions)
       lines.flatMap(parser.parse)
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index e7c3545630fea5b6b9683ae1251ff174d1e59791..4a308ff1a32f88e379305feba114fc458d351038 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.util.ContextUtil
 import org.apache.parquet.schema.MessageType
 import org.slf4j.bridge.SLF4JBridgeHandler
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
@@ -388,6 +388,7 @@ class ParquetFileFormat
       }
 
       val iter = new RecordReaderIterator(parquetReader)
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
 
       // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
       if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index a0c3fd53fb53bab64acacdfa18df2fd6187068d9..a875b01ec2d7a8d78d5c289d592e6c5254bd1cb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.io.{NullWritable, Text}
 import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
 
+import org.apache.spark.TaskContext
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -100,6 +101,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
 
     (file: PartitionedFile) => {
       val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => reader.close()))
 
       if (requiredSchema.isEmpty) {
         val emptyUnsafeRow = new UnsafeRow(0)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 03b508e11aa76772bad47bd590b7cdadc697a3fa..15b72d8d2179f0972fcc6170c55de0cc2e8760d5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, Re
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 
+import org.apache.spark.TaskContext
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -146,12 +147,15 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
           new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength)
         }
 
+        val recordsIterator = new RecordReaderIterator[OrcStruct](orcRecordReader)
+        Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => recordsIterator.close()))
+
         // Unwraps `OrcStruct`s to `UnsafeRow`s
         OrcRelation.unwrapOrcStructs(
           conf,
           requiredSchema,
           Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
-          new RecordReaderIterator[OrcStruct](orcRecordReader))
+          recordsIterator)
       }
     }
   }