diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 20fda95154d6b0e6747ced3e8ac9ad0f625eda61..4448796b1620d1b3244f276da25963d70df40a32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -59,7 +59,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
       if (files.fileFormat.toString == "TestFileFormat" ||
          files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
          files.fileFormat.toString == "ORC" ||
-         files.fileFormat.isInstanceOf[json.DefaultSource]) &&
+         files.fileFormat.isInstanceOf[json.DefaultSource] ||
+         files.fileFormat.isInstanceOf[text.DefaultSource]) &&
          files.sqlContext.conf.useFileScan =>
       // Filters on this relation fall into four categories based on where we can use them to avoid
       // reading unneeded data:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 5cfc9e9afad3241a097d0f5db6135e5605508d63..d6ab5fc56e7c27be871b7a6342e607aa18a35062 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.text
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
 import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
@@ -30,7 +31,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -125,6 +126,31 @@ class DefaultSource extends FileFormat with DataSourceRegister {
           }
         }
   }
+
+  override def buildReader(
+      sqlContext: SQLContext,
+      partitionSchema: StructType,
+      dataSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val broadcastedConf =
+      sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+    file => {
+      val unsafeRow = new UnsafeRow(1)
+      val bufferHolder = new BufferHolder(unsafeRow)
+      val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
+
+      new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+        // Writes to an UnsafeRow directly
+        bufferHolder.reset()
+        unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+        unsafeRow.setTotalSize(bufferHolder.totalSize())
+        unsafeRow
+      }
+    }
+  }
 }
 
 class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)