Skip to content
Snippets Groups Projects
Commit 83775bc7 authored by Wenchen Fan's avatar Wenchen Fan
Browse files

[SPARK-14158][SQL] implement buildReader for json data source

## What changes were proposed in this pull request?

This PR implements buildReader for json data source and enable it in the new data source code path.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #11960 from cloud-fan/json.
parent 63b200e8
No related branches found
No related tags found
No related merge requests found
...@@ -58,7 +58,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { ...@@ -58,7 +58,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _))
if (files.fileFormat.toString == "TestFileFormat" || if (files.fileFormat.toString == "TestFileFormat" ||
files.fileFormat.isInstanceOf[parquet.DefaultSource] || files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
files.fileFormat.toString == "ORC") && files.fileFormat.toString == "ORC" ||
files.fileFormat.isInstanceOf[json.DefaultSource]) &&
files.sqlContext.conf.parquetFileScan => files.sqlContext.conf.parquetFileScan =>
// Filters on this relation fall into four categories based on where we can use them to avoid // Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data: // reading unneeded data:
...@@ -138,7 +139,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { ...@@ -138,7 +139,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val splitFiles = selectedPartitions.flatMap { partition => val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file => partition.files.flatMap { file =>
assert(file.getLen != 0, file.toString)
(0L to file.getLen by maxSplitBytes).map { offset => (0L to file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, LineRecordReader}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
/**
* An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines
* in that file.
*/
class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) extends Iterator[Text] {
private val iterator = {
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)),
file.start,
file.length,
// TODO: Implement Locality
Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
val reader = new LineRecordReader()
reader.initialize(fileSplit, hadoopAttemptContext)
new RecordReaderIterator(reader)
}
override def hasNext: Boolean = iterator.hasNext
override def next(): Text = iterator.next()
}
...@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json ...@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.CharArrayWriter import java.io.CharArrayWriter
import com.fasterxml.jackson.core.JsonFactory import com.fasterxml.jackson.core.JsonFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text} import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat} import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
...@@ -32,7 +33,8 @@ import org.apache.spark.internal.Logging ...@@ -32,7 +33,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
...@@ -120,6 +122,39 @@ class DefaultSource extends FileFormat with DataSourceRegister { ...@@ -120,6 +122,39 @@ class DefaultSource extends FileFormat with DataSourceRegister {
} }
} }
override def buildReader(
sqlContext: SQLContext,
partitionSchema: StructType,
dataSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
val broadcastedConf =
sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
val parsedOptions: JSONOptions = new JSONOptions(options)
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
.getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
file => {
val lines = new HadoopFileLinesReader(file, broadcastedConf.value.value).map(_.toString)
val rows = JacksonParser.parseJson(
lines,
dataSchema,
columnNameOfCorruptRecord,
parsedOptions)
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
rows.map { row =>
appendPartitionColumns(joinedRow(row, file.partitionValues))
}
}
}
private def createBaseRdd(sqlContext: SQLContext, inputPaths: Seq[FileStatus]): RDD[String] = { private def createBaseRdd(sqlContext: SQLContext, inputPaths: Seq[FileStatus]): RDD[String] = {
val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
val conf = job.getConfiguration val conf = job.getConfiguration
......
...@@ -250,7 +250,7 @@ object JacksonParser extends Logging { ...@@ -250,7 +250,7 @@ object JacksonParser extends Logging {
new GenericArrayData(values.toArray) new GenericArrayData(values.toArray)
} }
private def parseJson( def parseJson(
input: Iterator[String], input: Iterator[String],
schema: StructType, schema: StructType,
columnNameOfCorruptRecords: String, columnNameOfCorruptRecords: String,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment