Skip to content
Snippets Groups Projects
Commit bf2f233e authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-19092][SQL][BACKPORT-2.1] Save() API of DataFrameWriter should not scan...

[SPARK-19092][SQL][BACKPORT-2.1] Save() API of DataFrameWriter should not scan all the saved files #16481

### What changes were proposed in this pull request?

#### This PR is to backport https://github.com/apache/spark/pull/16481 to Spark 2.1
---
`DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it.

### How was this patch tested?
Added and modified the test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16588 from gatorsmile/backport-19092.
parent db37049d
No related branches found
No related tags found
No related merge requests found
......@@ -224,7 +224,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = Some(table))
val result = try {
dataSource.write(mode, df)
dataSource.writeAndRead(mode, df)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table $tableName in $mode mode", ex)
......
......@@ -413,10 +413,82 @@ case class DataSource(
relation
}
/** Writes the given [[DataFrame]] out to this [[DataSource]]. */
def write(
mode: SaveMode,
data: DataFrame): BaseRelation = {
/**
* Writes the given [[DataFrame]] out in this [[FileFormat]].
*/
private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${allPaths.mkString(", ")}")
}
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(
data.schema, partitionColumns, caseSensitive)
// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumns = Try {
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
}
}
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val columns = partitionColumns.map { name =>
val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitionKeys = Map.empty,
customPartitionLocations = Map.empty,
partitionColumns = columns,
bucketSpec = bucketSpec,
fileFormat = format,
refreshFunction = _ => Unit, // No existing table needs to be refreshed.
options = options,
query = data.logicalPlan,
mode = mode,
catalogTable = catalogTable)
sparkSession.sessionState.executePlan(plan).toRdd
}
/**
* Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for
* the following reading.
*/
def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
......@@ -425,74 +497,27 @@ case class DataSource(
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${allPaths.mkString(", ")}")
}
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(
data.schema, partitionColumns, caseSensitive)
// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumns = Try {
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
}
}
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val columns = partitionColumns.map { name =>
val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitionKeys = Map.empty,
customPartitionLocations = Map.empty,
partitionColumns = columns,
bucketSpec = bucketSpec,
fileFormat = format,
refreshFunction = _ => Unit, // No existing table needs to be refreshed.
options = options,
query = data.logicalPlan,
mode = mode,
catalogTable = catalogTable)
sparkSession.sessionState.executePlan(plan).toRdd
writeInFileFormat(format, mode, data)
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}
/**
* Writes the given [[DataFrame]] out to this [[DataSource]].
*/
def write(mode: SaveMode, data: DataFrame): Unit = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
writeInFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
......
......@@ -62,17 +62,12 @@ class PartitionedTablePerfStatsSuite
}
private def setupPartitionedHiveTable(
tableName: String, dir: File, scale: Int,
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)
if (clearMetricsBeforeCreate) {
HiveCatalogMetrics.reset()
}
spark.sql(s"""
|create external table $tableName (fieldOne long)
|partitioned by (partCol1 int, partCol2 int)
......@@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite
}
private def setupPartitionedDatasourceTable(
tableName: String, dir: File, scale: Int,
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)
if (clearMetricsBeforeCreate) {
HiveCatalogMetrics.reset()
}
spark.sql(s"""
|create table $tableName (fieldOne long, partCol1 int, partCol2 int)
|using parquet
......@@ -271,8 +261,8 @@ class PartitionedTablePerfStatsSuite
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable(
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
HiveCatalogMetrics.reset()
setupPartitionedDatasourceTable("test", dir, scale = 10, repair = false)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
......@@ -285,8 +275,7 @@ class PartitionedTablePerfStatsSuite
withTable("test") {
withTempDir { dir =>
HiveCatalogMetrics.reset()
setupPartitionedHiveTable(
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
setupPartitionedHiveTable("test", dir, scale = 10, repair = false)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
......@@ -416,12 +405,8 @@ class PartitionedTablePerfStatsSuite
})
executorPool.shutdown()
executorPool.awaitTermination(30, TimeUnit.SECONDS)
// check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and
// METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect,
// only one thread can really do the build, so the listing job count is 2, the other
// one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment