Skip to content
Snippets Groups Projects
Commit e3fef0f9 authored by Cheng Lian's avatar Cheng Lian Committed by Yin Huai
Browse files

[SPARK-9743] [SQL] Fixes JSONRelation refreshing

PR #7696 added two `HadoopFsRelation.refresh()` calls ([this] [1], and [this] [2]) in `DataSourceStrategy` to make test case `InsertSuite.save directly to the path of a JSON table` pass. However, this forces every `HadoopFsRelation` table scan to do a refresh, which can be super expensive for tables with large number of partitions.

The reason why the original test case fails without the `refresh()` calls is that, the old JSON relation builds the base RDD with the input paths, while `HadoopFsRelation` provides `FileStatus`es of leaf files. With the old JSON relation, we can create a temporary table based on a path, writing data to that, and then read newly written data without refreshing the table. This is no long true for `HadoopFsRelation`.

This PR removes those two expensive refresh calls, and moves the refresh into `JSONRelation` to fix this issue. We might want to update `HadoopFsRelation` interface to provide better support for this use case.

[1]: https://github.com/apache/spark/blob/ebfd91c542aaead343cb154277fcf9114382fee7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L63
[2]: https://github.com/apache/spark/blob/ebfd91c542aaead343cb154277fcf9114382fee7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L91

Author: Cheng Lian <lian@databricks.com>

Closes #8035 from liancheng/spark-9743/fix-json-relation-refreshing and squashes the following commits:

ec1957d [Cheng Lian] Fixes JSONRelation refreshing
parent be80def0
No related branches found
No related tags found
No related merge requests found
...@@ -60,7 +60,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { ...@@ -60,7 +60,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Scanning partitioned HadoopFsRelation // Scanning partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty => if t.partitionSpec.partitionColumns.nonEmpty =>
t.refresh()
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
logInfo { logInfo {
...@@ -88,7 +87,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { ...@@ -88,7 +87,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Scanning non-partitioned HadoopFsRelation // Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) => case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
t.refresh()
// See buildPartitionedTableScan for the reason that we need to create a shard // See buildPartitionedTableScan for the reason that we need to create a shard
// broadcast HadoopConf. // broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf val sharedHadoopConf = SparkHadoopUtil.get.conf
......
...@@ -22,20 +22,22 @@ import java.io.CharArrayWriter ...@@ -22,20 +22,22 @@ import java.io.CharArrayWriter
import com.fasterxml.jackson.core.JsonFactory import com.fasterxml.jackson.core.JsonFactory
import com.google.common.base.Objects import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{Text, LongWritable, NullWritable} import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat} import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionSpec import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.util.SerializableConfiguration
private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
...@@ -108,6 +110,15 @@ private[sql] class JSONRelation( ...@@ -108,6 +110,15 @@ private[sql] class JSONRelation(
jsonSchema jsonSchema
} }
override private[sql] def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
refresh()
super.buildScan(requiredColumns, filters, inputPaths, broadcastedConf)
}
override def buildScan( override def buildScan(
requiredColumns: Array[String], requiredColumns: Array[String],
filters: Array[Filter], filters: Array[Filter],
......
...@@ -555,7 +555,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio ...@@ -555,7 +555,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}) })
} }
private[sql] final def buildScan( private[sql] def buildScan(
requiredColumns: Array[String], requiredColumns: Array[String],
filters: Array[Filter], filters: Array[Filter],
inputPaths: Array[String], inputPaths: Array[String],
......
...@@ -32,9 +32,9 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll { ...@@ -32,9 +32,9 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
var path: File = null var path: File = null
override def beforeAll: Unit = { override def beforeAll(): Unit = {
path = Utils.createTempDir() path = Utils.createTempDir()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
caseInsensitiveContext.read.json(rdd).registerTempTable("jt") caseInsensitiveContext.read.json(rdd).registerTempTable("jt")
sql( sql(
s""" s"""
...@@ -46,7 +46,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll { ...@@ -46,7 +46,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
""".stripMargin) """.stripMargin)
} }
override def afterAll: Unit = { override def afterAll(): Unit = {
caseInsensitiveContext.dropTempTable("jsonTable") caseInsensitiveContext.dropTempTable("jsonTable")
caseInsensitiveContext.dropTempTable("jt") caseInsensitiveContext.dropTempTable("jt")
Utils.deleteRecursively(path) Utils.deleteRecursively(path)
...@@ -110,7 +110,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll { ...@@ -110,7 +110,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
) )
// Writing the table to less part files. // Writing the table to less part files.
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 5) val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 5)
caseInsensitiveContext.read.json(rdd1).registerTempTable("jt1") caseInsensitiveContext.read.json(rdd1).registerTempTable("jt1")
sql( sql(
s""" s"""
...@@ -122,7 +122,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll { ...@@ -122,7 +122,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
) )
// Writing the table to more part files. // Writing the table to more part files.
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 10) val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 10)
caseInsensitiveContext.read.json(rdd2).registerTempTable("jt2") caseInsensitiveContext.read.json(rdd2).registerTempTable("jt2")
sql( sql(
s""" s"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment