From 816f359cf043ef719a0bc7df0506a3a830fff70d Mon Sep 17 00:00:00 2001
From: Wenchen Fan <wenchen@databricks.com>
Date: Wed, 30 Mar 2016 17:32:53 +0800
Subject: [PATCH] [SPARK-14114][SQL] implement buildReader for text data source

## What changes were proposed in this pull request?

This PR implements buildReader for text data source and enable it in the new data source code path.

## How was this patch tested?

Existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #11934 from cloud-fan/text.
---
 .../datasources/FileSourceStrategy.scala      |  3 +-
 .../datasources/text/DefaultSource.scala      | 28 ++++++++++++++++++-
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 20fda95154..4448796b16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -59,7 +59,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
       if (files.fileFormat.toString == "TestFileFormat" ||
          files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
          files.fileFormat.toString == "ORC" ||
-         files.fileFormat.isInstanceOf[json.DefaultSource]) &&
+         files.fileFormat.isInstanceOf[json.DefaultSource] ||
+         files.fileFormat.isInstanceOf[text.DefaultSource]) &&
          files.sqlContext.conf.useFileScan =>
       // Filters on this relation fall into four categories based on where we can use them to avoid
       // reading unneeded data:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 5cfc9e9afa..d6ab5fc56e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.text
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
 import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
@@ -30,7 +31,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -125,6 +126,31 @@ class DefaultSource extends FileFormat with DataSourceRegister {
           }
         }
   }
+
+  override def buildReader(
+      sqlContext: SQLContext,
+      partitionSchema: StructType,
+      dataSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val broadcastedConf =
+      sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+    file => {
+      val unsafeRow = new UnsafeRow(1)
+      val bufferHolder = new BufferHolder(unsafeRow)
+      val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
+
+      new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+        // Writes to an UnsafeRow directly
+        bufferHolder.reset()
+        unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+        unsafeRow.setTotalSize(bufferHolder.totalSize())
+        unsafeRow
+      }
+    }
+  }
 }
 
 class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)
-- 
GitLab