Skip to content
Snippets Groups Projects
Commit e50934f1 authored by Yin Huai's avatar Yin Huai Committed by Michael Armbrust
Browse files

[SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements.

JIRA: https://issues.apache.org/jira/browse/SPARK-5723

Author: Yin Huai <yhuai@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>

Closes #4639 from yhuai/defaultCTASFileFormat and squashes the following commits:

a568137 [Yin Huai] Merge remote-tracking branch 'upstream/master' into defaultCTASFileFormat
ad2b07d [Yin Huai] Update tests and error messages.
8af5b2a [Yin Huai] Update conf key and unit test.
5a67903 [Yin Huai] Use data source write path for Hive's CTAS statements when no storage format/handler is specified.
parent d5f12bfe
No related branches found
No related tags found
No related merge requests found
......@@ -61,6 +61,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[sql] def convertMetastoreParquet: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
/**
* When true, a table created by a Hive CTAS statement (no USING clause) will be
* converted to a data source table, using the data source set by spark.sql.sources.default.
* The table in CTAS statement will be converted when it meets any of the following conditions:
* - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or
* a Storage Hanlder (STORED BY), and the value of hive.default.fileformat in hive-site.xml
* is either TextFile or SequenceFile.
* - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe
* is specified (no ROW FORMAT SERDE clause).
* - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format
* and no SerDe is specified (no ROW FORMAT SERDE clause).
*/
protected[sql] def convertCTAS: Boolean =
getConf("spark.sql.hive.convertCTAS", "false").toBoolean
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)
......
......@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark.Logging
import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
......@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec}
import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, DDLParser, LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
......@@ -502,24 +502,69 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
Some(sa.getQB().getTableDesc)
}
execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
desc)
// Check if the query specifies file format or storage handler.
val hasStorageSpec = desc match {
case Some(crtTbl) =>
crtTbl != null && (crtTbl.getSerName != null || crtTbl.getStorageHandler != null)
case None => false
}
if (hive.convertCTAS && !hasStorageSpec) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (dbName.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
tblName,
hive.conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
} else {
execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
desc)
}
case p: LogicalPlan if p.resolved => p
case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
None)
if (hive.convertCTAS) {
if (dbName.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
tblName,
hive.conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
} else {
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
None)
}
}
}
......
......@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.sources._
......@@ -121,7 +122,7 @@ case class CreateMetastoreDataSource(
if (allowExisting) {
return Seq.empty[Row]
} else {
sys.error(s"Table $tableName already exists.")
throw new AnalysisException(s"Table $tableName already exists.")
}
}
......@@ -172,9 +173,11 @@ case class CreateMetastoreDataSourceAsSelect(
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
sys.error(s"Table $tableName already exists. " +
s"If you want to append into it, please set mode to SaveMode.Append. " +
s"Or, if you want to overwrite it, please set mode to SaveMode.Overwrite.")
throw new AnalysisException(s"Table $tableName already exists. " +
s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " +
s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" +
s"the existing data. " +
s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
case SaveMode.Ignore =>
// Since the table already exists and the save mode is Ignore, we will just return.
return Seq.empty[Row]
......@@ -199,7 +202,7 @@ case class CreateMetastoreDataSourceAsSelect(
s"== Actual Schema ==" +:
createdRelation.schema.treeString.split("\\\n")).mkString("\n")}
""".stripMargin
sys.error(errorMessage)
throw new AnalysisException(errorMessage)
} else if (i != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
......@@ -216,10 +219,10 @@ case class CreateMetastoreDataSourceAsSelect(
s"== Actual Relation ==" ::
createdRelation.toString :: Nil).mkString("\n")}
""".stripMargin
sys.error(errorMessage)
throw new AnalysisException(errorMessage)
}
case o =>
sys.error(s"Saving data in ${o.toString} is not supported.")
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
case SaveMode.Overwrite =>
hiveContext.sql(s"DROP TABLE IF EXISTS $tableName")
......
......@@ -306,8 +306,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
|SELECT * FROM jsonTable
""".stripMargin)
// Create the table again should trigger a AlreadyExistsException.
val message = intercept[RuntimeException] {
// Create the table again should trigger a AnalysisException.
val message = intercept[AnalysisException] {
sql(
s"""
|CREATE TABLE ctasJsonTable
......@@ -516,7 +516,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT * FROM createdJsonTable"),
df.collect())
var message = intercept[RuntimeException] {
var message = intercept[AnalysisException] {
createExternalTable("createdJsonTable", filePath.toString)
}.getMessage
assert(message.contains("Table createdJsonTable already exists."),
......
......@@ -17,10 +17,13 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
......@@ -42,6 +45,73 @@ class SQLQuerySuite extends QueryTest {
)
}
test("CTAS without serde") {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
case LogicalRelation(r: ParquetRelation2) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
s"${ParquetRelation2.getClass.getCanonicalName}.")
}
case r: MetastoreRelation =>
if (isDataSourceParquet) {
fail(
s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
}
}
}
val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
setConf("spark.sql.hive.convertCTAS", "true")
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
var message = intercept[AnalysisException] {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
}.getMessage
assert(message.contains("Table ctas1 already exists"))
checkRelation("ctas1", true)
sql("DROP TABLE ctas1")
// Specifying database name for query can be converted to data source write path
// is not allowed right now.
message = intercept[AnalysisException] {
sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
}.getMessage
assert(
message.contains("Cannot specify database name in a CTAS statement"),
"When spark.sql.hive.convertCTAS is true, we should not allow " +
"database name specified.")
sql("CREATE TABLE ctas1 stored as textfile AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", true)
sql("DROP TABLE ctas1")
sql(
"CREATE TABLE ctas1 stored as sequencefile AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", true)
sql("DROP TABLE ctas1")
sql("CREATE TABLE ctas1 stored as rcfile AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", false)
sql("DROP TABLE ctas1")
sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", false)
sql("DROP TABLE ctas1")
sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", false)
sql("DROP TABLE ctas1")
setConf("spark.sql.hive.convertCTAS", originalConf)
}
test("CTAS with serde") {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect()
sql(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment