Skip to content
Snippets Groups Projects
Commit f7c07db8 authored by Wenchen Fan's avatar Wenchen Fan Committed by gatorsmile
Browse files

[SPARK-19152][SQL][FOLLOWUP] simplify CreateHiveTableAsSelectCommand

## What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/16552 , `CreateHiveTableAsSelectCommand` becomes very similar to `CreateDataSourceTableAsSelectCommand`, and we can further simplify it by only creating table in the table-not-exist branch.

This PR also adds hive provider checking in DataStream reader/writer, which is missed in #16552

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16693 from cloud-fan/minor.
parent cfcfc92f
No related branches found
No related tags found
No related merge requests found
......@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.StructType
......@@ -116,6 +117,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* @since 2.0.0
*/
def load(): DataFrame = {
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}
val dataSource =
DataSource(
sparkSession,
......
......@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink}
......@@ -221,6 +222,11 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
* @since 2.0.0
*/
def start(): StreamingQuery = {
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"write files of Hive data source directly.")
}
if (source == "memory") {
assertNotPartitioned("memory")
if (extraOptions.get("queryName").isEmpty) {
......
......@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution
import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.MetastoreRelation
/**
......@@ -44,40 +44,6 @@ case class CreateHiveTableAsSelectCommand(
override def innerChildren: Seq[LogicalPlan] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
lazy val metastoreRelation: MetastoreRelation = {
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextInputFormat
val withFormat =
tableDesc.withNewStorage(
inputFormat =
tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
outputFormat =
tableDesc.storage.outputFormat
.orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)),
compressed = tableDesc.storage.compressed)
val withSchema = if (withFormat.schema.isEmpty) {
tableDesc.copy(schema = query.schema)
} else {
withFormat
}
sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true)
// Get the Metastore Relation
sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
case SubqueryAlias(_, r: SimpleCatalogRelation, _) =>
val tableMeta = r.metadata
MetastoreRelation(tableMeta.database, tableMeta.identifier.table)(tableMeta, sparkSession)
}
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
......@@ -89,12 +55,30 @@ case class CreateHiveTableAsSelectCommand(
// Since the table already exists and the save mode is Ignore, we will just return.
return Seq.empty
}
sparkSession.sessionState.executePlan(InsertIntoTable(
metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd
sparkSession.sessionState.executePlan(
InsertIntoTable(
UnresolvedRelation(tableIdentifier),
Map(),
query,
overwrite = false,
ifNotExists = false)).toRdd
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
assert(tableDesc.schema.isEmpty)
sparkSession.sessionState.catalog.createTable(
tableDesc.copy(schema = query.schema), ignoreIfExists = false)
try {
sparkSession.sessionState.executePlan(InsertIntoTable(
metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
sparkSession.sessionState.executePlan(
InsertIntoTable(
UnresolvedRelation(tableIdentifier),
Map(),
query,
overwrite = true,
ifNotExists = false)).toRdd
} catch {
case NonFatal(e) =>
// drop the created table.
......
......@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
import spark.implicits._
import testImplicits._
override def afterEach(): Unit = {
try {
......@@ -1425,6 +1425,17 @@ class HiveDDLSuite
Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath)
}
assert(e2.message.contains("Hive data source can only be used with tables"))
val e3 = intercept[AnalysisException] {
spark.readStream.format("hive").load(dir.getAbsolutePath)
}
assert(e3.message.contains("Hive data source can only be used with tables"))
val e4 = intercept[AnalysisException] {
spark.readStream.schema(new StructType()).parquet(dir.getAbsolutePath)
.writeStream.format("hive").start(dir.getAbsolutePath)
}
assert(e4.message.contains("Hive data source can only be used with tables"))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment