diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index d3601cca832b2a4fdf0ec26825a7ff38b3fbdb57..aaef7c74eea33ee6f6c6180129662ae484758e71 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -19,7 +19,6 @@ package org.apache.spark.input
 
 import scala.collection.JavaConversions._
 
-import org.apache.hadoop.conf.{Configuration, Configurable}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.hadoop.mapreduce.JobContext
@@ -38,18 +37,13 @@ private[spark] class WholeTextFileInputFormat
 
   override protected def isSplitable(context: JobContext, file: Path): Boolean = false
 
-  private var conf: Configuration = _
-  def setConf(c: Configuration) {
-    conf = c
-  }
-  def getConf: Configuration = conf
-
   override def createRecordReader(
       split: InputSplit,
       context: TaskAttemptContext): RecordReader[String, String] = {
 
-    val reader = new WholeCombineFileRecordReader(split, context)
-    reader.setConf(conf)
+    val reader =
+      new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader])
+    reader.setConf(getConf)
     reader
   }
 
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index 6d59b24eb0596971baf7cf7e815e93be26a9e977..1b1131b9b8831d591d3c9ee229f597a9bd8dd568 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.input
 
-import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.hadoop.conf.{Configuration, Configurable => HConfigurable}
 import com.google.common.io.{ByteStreams, Closeables}
 
 import org.apache.hadoop.io.Text
@@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor
 import org.apache.hadoop.mapreduce.RecordReader
 import org.apache.hadoop.mapreduce.TaskAttemptContext
 
+
+/**
+ * A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
+ */
+private[spark] trait Configurable extends HConfigurable {
+  private var conf: Configuration = _
+  def setConf(c: Configuration) {
+    conf = c
+  }
+  def getConf: Configuration = conf
+}
+
 /**
  * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
  * out in a key-value pair, where the key is the file path and the value is the entire content of
@@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader(
     index: Integer)
   extends RecordReader[String, String] with Configurable {
 
-  private var conf: Configuration = _
-  def setConf(c: Configuration) {
-    conf = c
-  }
-  def getConf: Configuration = conf
-
   private[this] val path = split.getPath(index)
   private[this] val fs = path.getFileSystem(context.getConfiguration)
 
@@ -87,29 +93,24 @@ private[spark] class WholeTextFileRecordReader(
 
 
 /**
- * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
- * out in a key-value pair, where the key is the file path and the value is the entire content of
- * the file.
+ * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]]
+ * that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]]
+ * RecordReaders.
  */
-private[spark] class WholeCombineFileRecordReader(
+private[spark] class ConfigurableCombineFileRecordReader[K, V](
     split: InputSplit,
-    context: TaskAttemptContext)
-  extends CombineFileRecordReader[String, String](
+    context: TaskAttemptContext,
+    recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable])
+  extends CombineFileRecordReader[K, V](
     split.asInstanceOf[CombineFileSplit],
     context,
-    classOf[WholeTextFileRecordReader]
+    recordReaderClass
   ) with Configurable {
 
-  private var conf: Configuration = _
-  def setConf(c: Configuration) {
-    conf = c
-  }
-  def getConf: Configuration = conf
-
   override def initNextRecordReader(): Boolean = {
     val r = super.initNextRecordReader()
     if (r) {
-      this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
+      this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
     }
     r
   }