From bf2f233e49013da54a6accd96c471acafc24df15 Mon Sep 17 00:00:00 2001
From: gatorsmile <>
Date: Mon, 16 Jan 2017 10:58:10 +0800
Subject: [PATCH] [SPARK-19092][SQL][BACKPORT-2.1] Save() API of
 DataFrameWriter should not scan all the saved files #16481

### What changes were proposed in this pull request?

#### This PR is to backport to Spark 2.1
`DataFrameWriter`'s [save() API]( is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it.

### How was this patch tested?
Added and modified the test cases

Author: gatorsmile <>

Closes #16588 from gatorsmile/backport-19092.
 .../command/createDataSourceTables.scala      |   2 +-
 .../execution/datasources/DataSource.scala    | 163 ++++++++++--------
 .../hive/PartitionedTablePerfStatsSuite.scala |  29 +---
 3 files changed, 102 insertions(+), 92 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 193a2a2cdc..630adb0d99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -224,7 +224,7 @@ case class CreateDataSourceTableAsSelectCommand(
       catalogTable = Some(table))
     val result = try {
-      dataSource.write(mode, df)
+      dataSource.writeAndRead(mode, df)
     } catch {
       case ex: AnalysisException =>
         logError(s"Failed to write to table $tableName in $mode mode", ex)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 31a491fb3d..af70bf7e5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -413,10 +413,82 @@ case class DataSource(
-  /** Writes the given [[DataFrame]] out to this [[DataSource]]. */
-  def write(
-      mode: SaveMode,
-      data: DataFrame): BaseRelation = {
+  /**
+   * Writes the given [[DataFrame]] out in this [[FileFormat]].
+   */
+  private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
+    // Don't glob path for the write path.  The contracts here are:
+    //  1. Only one output path can be specified on the write path;
+    //  2. Output path must be a legal HDFS style file system path;
+    //  3. It's OK that the output path doesn't exist yet;
+    val allPaths = paths ++ caseInsensitiveOptions.get("path")
+    val outputPath = if (allPaths.length == 1) {
+      val path = new Path(allPaths.head)
+      val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+      path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+    } else {
+      throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
+        s"got: ${allPaths.mkString(", ")}")
+    }
+    val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    PartitioningUtils.validatePartitionColumn(
+      data.schema, partitionColumns, caseSensitive)
+    // If we are appending to a table that already exists, make sure the partitioning matches
+    // up.  If we fail to load the table for whatever reason, ignore the check.
+    if (mode == SaveMode.Append) {
+      val existingPartitionColumns = Try {
+        getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
+      }.getOrElse(Seq.empty[String])
+      // TODO: Case sensitivity.
+      val sameColumns =
+ ==
+      if (existingPartitionColumns.nonEmpty && !sameColumns) {
+        throw new AnalysisException(
+          s"""Requested partitioning does not match existing partitioning.
+             |Existing partitioning columns:
+             |  ${existingPartitionColumns.mkString(", ")}
+             |Requested partitioning columns:
+             |  ${partitionColumns.mkString(", ")}
+             |""".stripMargin)
+      }
+    }
+    // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
+    // not need to have the query as child, to avoid to analyze an optimized query,
+    // because InsertIntoHadoopFsRelationCommand will be optimized first.
+    val columns = { name =>
+      val plan = data.logicalPlan
+      plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
+        throw new AnalysisException(
+          s"Unable to resolve $name given [${", ")}]")
+      }.asInstanceOf[Attribute]
+    }
+    // For partitioned relation r, r.schema's column ordering can be different from the column
+    // ordering of data.logicalPlan (partition columns are all moved after data column).  This
+    // will be adjusted within InsertIntoHadoopFsRelation.
+    val plan =
+      InsertIntoHadoopFsRelationCommand(
+        outputPath = outputPath,
+        staticPartitionKeys = Map.empty,
+        customPartitionLocations = Map.empty,
+        partitionColumns = columns,
+        bucketSpec = bucketSpec,
+        fileFormat = format,
+        refreshFunction = _ => Unit, // No existing table needs to be refreshed.
+        options = options,
+        query = data.logicalPlan,
+        mode = mode,
+        catalogTable = catalogTable)
+    sparkSession.sessionState.executePlan(plan).toRdd
+  }
+  /**
+   * Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for
+   * the following reading.
+   */
+  def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
     if ([CalendarIntervalType])) {
       throw new AnalysisException("Cannot save interval data type into external storage.")
@@ -425,74 +497,27 @@ case class DataSource(
       case dataSource: CreatableRelationProvider =>
         dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
       case format: FileFormat =>
-        // Don't glob path for the write path.  The contracts here are:
-        //  1. Only one output path can be specified on the write path;
-        //  2. Output path must be a legal HDFS style file system path;
-        //  3. It's OK that the output path doesn't exist yet;
-        val allPaths = paths ++ caseInsensitiveOptions.get("path")
-        val outputPath = if (allPaths.length == 1) {
-          val path = new Path(allPaths.head)
-          val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
-          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        } else {
-          throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
-            s"got: ${allPaths.mkString(", ")}")
-        }
-        val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
-        PartitioningUtils.validatePartitionColumn(
-          data.schema, partitionColumns, caseSensitive)
-        // If we are appending to a table that already exists, make sure the partitioning matches
-        // up.  If we fail to load the table for whatever reason, ignore the check.
-        if (mode == SaveMode.Append) {
-          val existingPartitionColumns = Try {
-            getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
-          }.getOrElse(Seq.empty[String])
-          // TODO: Case sensitivity.
-          val sameColumns =
-   ==
-          if (existingPartitionColumns.nonEmpty && !sameColumns) {
-            throw new AnalysisException(
-              s"""Requested partitioning does not match existing partitioning.
-                 |Existing partitioning columns:
-                 |  ${existingPartitionColumns.mkString(", ")}
-                 |Requested partitioning columns:
-                 |  ${partitionColumns.mkString(", ")}
-                 |""".stripMargin)
-          }
-        }
-        // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
-        // not need to have the query as child, to avoid to analyze an optimized query,
-        // because InsertIntoHadoopFsRelationCommand will be optimized first.
-        val columns = { name =>
-          val plan = data.logicalPlan
-          plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
-            throw new AnalysisException(
-              s"Unable to resolve $name given [${", ")}]")
-          }.asInstanceOf[Attribute]
-        }
-        // For partitioned relation r, r.schema's column ordering can be different from the column
-        // ordering of data.logicalPlan (partition columns are all moved after data column).  This
-        // will be adjusted within InsertIntoHadoopFsRelation.
-        val plan =
-          InsertIntoHadoopFsRelationCommand(
-            outputPath = outputPath,
-            staticPartitionKeys = Map.empty,
-            customPartitionLocations = Map.empty,
-            partitionColumns = columns,
-            bucketSpec = bucketSpec,
-            fileFormat = format,
-            refreshFunction = _ => Unit, // No existing table needs to be refreshed.
-            options = options,
-            query = data.logicalPlan,
-            mode = mode,
-            catalogTable = catalogTable)
-        sparkSession.sessionState.executePlan(plan).toRdd
+        writeInFileFormat(format, mode, data)
         // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
         copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
+      case _ =>
+        sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
+    }
+  }
+  /**
+   * Writes the given [[DataFrame]] out to this [[DataSource]].
+   */
+  def write(mode: SaveMode, data: DataFrame): Unit = {
+    if ([CalendarIntervalType])) {
+      throw new AnalysisException("Cannot save interval data type into external storage.")
+    }
+    providingClass.newInstance() match {
+      case dataSource: CreatableRelationProvider =>
+        dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
+      case format: FileFormat =>
+        writeInFileFormat(format, mode, data)
       case _ =>
         sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 55b72c625d..5bca90b7c9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -62,17 +62,12 @@ class PartitionedTablePerfStatsSuite
   private def setupPartitionedHiveTable(
-      tableName: String, dir: File, scale: Int,
-      clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
+      tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
     spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
       .partitionBy("partCol1", "partCol2")
-    if (clearMetricsBeforeCreate) {
-      HiveCatalogMetrics.reset()
-    }
       |create external table $tableName (fieldOne long)
       |partitioned by (partCol1 int, partCol2 int)
@@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite
   private def setupPartitionedDatasourceTable(
-      tableName: String, dir: File, scale: Int,
-      clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
+      tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
     spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
       .partitionBy("partCol1", "partCol2")
-    if (clearMetricsBeforeCreate) {
-      HiveCatalogMetrics.reset()
-    }
       |create table $tableName (fieldOne long, partCol1 int, partCol2 int)
       |using parquet
@@ -271,8 +261,8 @@ class PartitionedTablePerfStatsSuite
       withTable("test") {
         withTempDir { dir =>
-          setupPartitionedDatasourceTable(
-            "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
+          HiveCatalogMetrics.reset()
+          setupPartitionedDatasourceTable("test", dir, scale = 10, repair = false)
           assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
           assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
@@ -285,8 +275,7 @@ class PartitionedTablePerfStatsSuite
       withTable("test") {
         withTempDir { dir =>
-          setupPartitionedHiveTable(
-            "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
+          setupPartitionedHiveTable("test", dir, scale = 10, repair = false)
           assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
           assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
@@ -416,12 +405,8 @@ class PartitionedTablePerfStatsSuite
           executorPool.awaitTermination(30, TimeUnit.SECONDS)
-          // check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and
-          // METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect,
-          // only one thread can really do the build, so the listing job count is 2, the other
-          // one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
-          assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50)
+          assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)