Skip to content
Snippets Groups Projects
Commit dd85eb54 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Reynold Xin
Browse files

[SPARK-18107][SQL] Insert overwrite statement runs much slower in spark-sql...

[SPARK-18107][SQL] Insert overwrite statement runs much slower in spark-sql than it does in hive-client

## What changes were proposed in this pull request?

As reported on the jira, insert overwrite statement runs much slower in Spark, compared with hive-client.

It seems there is a patch [HIVE-11940](https://github.com/apache/hive/commit/ba21806b77287e237e1aa68fa169d2a81e07346d) which largely improves insert overwrite performance on Hive. HIVE-11940 is patched after Hive 2.0.0.

Because Spark SQL uses older Hive library, we can not benefit from such improvement.

The reporter verified that there is also a big performance gap between Hive 1.2.1 (520.037 secs) and Hive 2.0.1 (35.975 secs) on insert overwrite execution.

Instead of upgrading to Hive 2.0 in Spark SQL, which might not be a trivial task, this patch provides an approach to delete the partition before asking Hive to load data files into the partition.

Note: The case reported on the jira is insert overwrite to partition. Since `Hive.loadTable` also uses the function to replace files, insert overwrite to table should has the same issue. We can take the same approach to delete the table first. I will upgrade this to include this.
## How was this patch tested?

Jenkins tests.

There are existing tests using insert overwrite statement. Those tests should be passed. I added a new test to specially test insert overwrite into partition.

For performance issue, as I don't have Hive 2.0 environment, this needs the reporter to verify it. Please refer to the jira.

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #15667 from viirya/improve-hive-insertoverwrite.
parent d9d14650
No related branches found
No related tags found
No related merge requests found
......@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
......@@ -257,7 +258,28 @@ case class InsertIntoHiveTable(
table.catalogTable.identifier.table,
partitionSpec)
var doHiveOverwrite = overwrite
if (oldPart.isEmpty || !ifNotExists) {
// SPARK-18107: Insert overwrite runs much slower than hive-client.
// Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
// version and we may not want to catch up new Hive version every time. We delete the
// Hive partition first and then load data file into the Hive partition.
if (oldPart.nonEmpty && overwrite) {
oldPart.get.storage.locationUri.map { uri =>
val partitionPath = new Path(uri)
val fs = partitionPath.getFileSystem(hadoopConf)
if (fs.exists(partitionPath)) {
if (!fs.delete(partitionPath, true)) {
throw new RuntimeException(
"Cannot remove partition directory '" + partitionPath.toString)
}
// Don't let Hive do overwrite operation since it is slower.
doHiveOverwrite = false
}
}
}
// inheritTableSpecs is set to true. It should be set to false for an IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
......@@ -266,7 +288,7 @@ case class InsertIntoHiveTable(
table.catalogTable.identifier.table,
outputPath.toString,
partitionSpec,
isOverwrite = overwrite,
isOverwrite = doHiveOverwrite,
holdDDLTime = holdDDLTime,
inheritTableSpecs = inheritTableSpecs)
}
......
......@@ -1973,6 +1973,39 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
test("Insert overwrite with partition") {
withTable("tableWithPartition") {
sql(
"""
|CREATE TABLE tableWithPartition (key int, value STRING)
|PARTITIONED BY (part STRING)
""".stripMargin)
sql(
"""
|INSERT OVERWRITE TABLE tableWithPartition PARTITION (part = '1')
|SELECT * FROM default.src
""".stripMargin)
checkAnswer(
sql("SELECT part, key, value FROM tableWithPartition"),
sql("SELECT '1' AS part, key, value FROM default.src")
)
sql(
"""
|INSERT OVERWRITE TABLE tableWithPartition PARTITION (part = '1')
|SELECT * FROM VALUES (1, "one"), (2, "two"), (3, null) AS data(key, value)
""".stripMargin)
checkAnswer(
sql("SELECT part, key, value FROM tableWithPartition"),
sql(
"""
|SELECT '1' AS part, key, value FROM VALUES
|(1, "one"), (2, "two"), (3, null) AS data(key, value)
""".stripMargin)
)
}
}
def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment