Skip to content
Snippets Groups Projects
Commit 816f359c authored by Wenchen Fan's avatar Wenchen Fan Committed by Cheng Lian
Browse files

[SPARK-14114][SQL] implement buildReader for text data source

## What changes were proposed in this pull request?

This PR implements buildReader for text data source and enable it in the new data source code path.

## How was this patch tested?

Existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #11934 from cloud-fan/text.
parent 7320f9bd
No related branches found
No related tags found
No related merge requests found
......@@ -59,7 +59,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
if (files.fileFormat.toString == "TestFileFormat" ||
files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
files.fileFormat.toString == "ORC" ||
files.fileFormat.isInstanceOf[json.DefaultSource]) &&
files.fileFormat.isInstanceOf[json.DefaultSource] ||
files.fileFormat.isInstanceOf[text.DefaultSource]) &&
files.sqlContext.conf.useFileScan =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
......
......@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
......@@ -30,7 +31,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.execution.datasources.CompressionCodecs
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
......@@ -125,6 +126,31 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
}
}
override def buildReader(
sqlContext: SQLContext,
partitionSchema: StructType,
dataSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
val broadcastedConf =
sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
file => {
val unsafeRow = new UnsafeRow(1)
val bufferHolder = new BufferHolder(unsafeRow)
val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
// Writes to an UnsafeRow directly
bufferHolder.reset()
unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
unsafeRow.setTotalSize(bufferHolder.totalSize())
unsafeRow
}
}
}
}
class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment