Skip to content
Snippets Groups Projects
Commit 415f9f34 authored by Xiao Li's avatar Xiao Li Committed by Wenchen Fan
Browse files

[SPARK-19921][SQL][TEST] Enable end-to-end testing using different Hive metastore versions.

### What changes were proposed in this pull request?

To improve the quality of our Spark SQL in different Hive metastore versions, this PR is to enable end-to-end testing using different versions. This PR allows the test cases in sql/hive to pass the existing Hive client to create a SparkSession.
- Since Derby does not allow concurrent connections, the pre-built Hive clients use different database from the TestHive's built-in 1.2.1 client.
- Since our test cases in sql/hive only can create a single Spark context in the same JVM, the newly created SparkSession share the same spark context with the existing TestHive's corresponding SparkSession.

### How was this patch tested?
Fixed the existing test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17260 from gatorsmile/versionSuite.
parent 4dc3a817
No related branches found
No related tags found
No related merge requests found
......@@ -87,7 +87,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
/**
* A catalog that interacts with external systems.
*/
val externalCatalog: ExternalCatalog =
lazy val externalCatalog: ExternalCatalog =
SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
......
......@@ -62,7 +62,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
/**
* A Hive client used to interact with the metastore.
*/
val client: HiveClient = {
lazy val client: HiveClient = {
HiveUtils.newClientForMetadata(conf, hadoopConf)
}
......
......@@ -24,23 +24,24 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{ExperimentalMethods, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.util.{ShutdownHookManager, Utils}
// SPARK-3729: Test key required to check for initialization errors with config.
......@@ -58,6 +59,37 @@ object TestHive
.set("spark.ui.enabled", "false")))
case class TestHiveVersion(hiveClient: HiveClient)
extends TestHiveContext(TestHive.sparkContext, hiveClient)
private[hive] class TestHiveExternalCatalog(
conf: SparkConf,
hadoopConf: Configuration,
hiveClient: Option[HiveClient] = None)
extends HiveExternalCatalog(conf, hadoopConf) with Logging {
override lazy val client: HiveClient =
hiveClient.getOrElse {
HiveUtils.newClientForMetadata(conf, hadoopConf)
}
}
private[hive] class TestHiveSharedState(
sc: SparkContext,
hiveClient: Option[HiveClient] = None)
extends SharedState(sc) {
override lazy val externalCatalog: ExternalCatalog = {
new TestHiveExternalCatalog(
sc.conf,
sc.hadoopConfiguration,
hiveClient)
}
}
/**
* A locally running test instance of Spark's Hive execution engine.
*
......@@ -81,6 +113,12 @@ class TestHiveContext(
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), loadTestTables))
}
def this(sc: SparkContext, hiveClient: HiveClient) {
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc),
hiveClient,
loadTestTables = false))
}
override def newSession(): TestHiveContext = {
new TestHiveContext(sparkSession.newSession())
}
......@@ -115,7 +153,7 @@ class TestHiveContext(
*/
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
@transient private val existingSharedState: Option[SharedState],
@transient private val existingSharedState: Option[TestHiveSharedState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>
......@@ -126,6 +164,13 @@ private[hive] class TestHiveSparkSession(
loadTestTables)
}
def this(sc: SparkContext, hiveClient: HiveClient, loadTestTables: Boolean) {
this(
sc,
existingSharedState = Some(new TestHiveSharedState(sc, Some(hiveClient))),
loadTestTables)
}
{ // set the metastore temporary configuration
val metastoreTempConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map(
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
......@@ -141,8 +186,8 @@ private[hive] class TestHiveSparkSession(
assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")
@transient
override lazy val sharedState: SharedState = {
existingSharedState.getOrElse(new SharedState(sc))
override lazy val sharedState: TestHiveSharedState = {
existingSharedState.getOrElse(new TestHiveSharedState(sc))
}
@transient
......@@ -463,6 +508,14 @@ private[hive] class TestHiveSparkSession(
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
// HDFS root scratch dir requires the write all (733) permission. For each connecting user,
// an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with
// ${hive.scratch.dir.permission}. To resolve the permission issue, the simplest way is to
// delete it. Later, it will be re-created with the right permission.
val location = new Path(sc.hadoopConfiguration.get(ConfVars.SCRATCHDIR.varname))
val fs = location.getFileSystem(sc.hadoopConfiguration)
fs.delete(location, true)
// Some tests corrupt this value on purpose, which breaks the RESET call below.
sessionState.conf.setConfString("fs.defaultFS", new File(".").toURI.toString)
// It is important that we RESET first as broken hooks that might have been set could break
......
......@@ -21,21 +21,20 @@ import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.test.TestHiveVersion
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.tags.ExtendedHiveTest
......@@ -48,11 +47,31 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
* is not fully tested.
*/
@ExtendedHiveTest
class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with Logging {
class VersionsSuite extends SparkFunSuite with Logging {
private val clientBuilder = new HiveClientBuilder
import clientBuilder.buildClient
/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}
/**
* Drops table `tableName` after calling `f`.
*/
protected def withTable(tableNames: String*)(f: => Unit): Unit = {
try f finally {
tableNames.foreach { name =>
versionSpark.sql(s"DROP TABLE IF EXISTS $name")
}
}
}
test("success sanity check") {
val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration())
val db = new CatalogDatabase("default", "desc", new URI("loc"), Map())
......@@ -93,6 +112,8 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
private var client: HiveClient = null
private var versionSpark: TestHiveVersion = null
versions.foreach { version =>
test(s"$version: create client") {
client = null
......@@ -105,6 +126,10 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
}
client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
if (versionSpark != null) versionSpark.reset()
versionSpark = TestHiveVersion(client)
assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
.version.fullVersion.startsWith(version))
}
def table(database: String, tableName: String): CatalogTable = {
......@@ -545,22 +570,22 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
test(s"$version: CREATE TABLE AS SELECT") {
withTable("tbl") {
spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
versionSpark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1)))
}
}
test(s"$version: Delete the temporary staging directory and files after each insert") {
withTempDir { tmpDir =>
withTable("tab") {
spark.sql(
versionSpark.sql(
s"""
|CREATE TABLE tab(c1 string)
|location '${tmpDir.toURI.toString}'
""".stripMargin)
(1 to 3).map { i =>
spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
versionSpark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
}
def listFiles(path: File): List[String] = {
val dir = path.listFiles()
......@@ -569,7 +594,9 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
folders.flatMap(listFiles) ++: filePaths
}
// expect 2 files left: `.part-00000-random-uuid.crc` and `part-00000-random-uuid`
assert(listFiles(tmpDir).length == 2)
// 0.12, 0.13, 1.0 and 1.1 also has another two more files ._SUCCESS.crc and _SUCCESS
val metadataFiles = Seq("._SUCCESS.crc", "_SUCCESS")
assert(listFiles(tmpDir).filterNot(metadataFiles.contains).length == 2)
}
}
}
......@@ -609,7 +636,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
withTable(tableName, tempTableName) {
// Creates the external partitioned Avro table to be tested.
sql(
versionSpark.sql(
s"""CREATE EXTERNAL TABLE $tableName
|PARTITIONED BY (ds STRING)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
......@@ -622,7 +649,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
)
// Creates an temporary Avro table used to prepare testing Avro file.
sql(
versionSpark.sql(
s"""CREATE EXTERNAL TABLE $tempTableName
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|STORED AS
......@@ -634,43 +661,29 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
)
// Generates Avro data.
sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
versionSpark.sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
// Adds generated Avro data as a new partition to the testing table.
sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
versionSpark.sql(
s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
// The following query fails before SPARK-13709 is fixed. This is because when reading
// data from table partitions, Avro deserializer needs the Avro schema, which is defined
// in table property "avro.schema.literal". However, we only initializes the deserializer
// using partition properties, which doesn't include the wanted property entry. Merging
// two sets of properties solves the problem.
checkAnswer(
sql(s"SELECT * FROM $tableName"),
Row(1, Row(2, 2.5D), "foo")
)
assert(versionSpark.sql(s"SELECT * FROM $tableName").collect() ===
Array(Row(1, Row(2, 2.5D), "foo")))
}
}
}
test(s"$version: CTAS for managed data source tables") {
withTable("t", "t1") {
import spark.implicits._
val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
Seq("1").toDF("a").write.saveAsTable("t")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == makeQualifiedPath(tPath.toString))
assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
checkAnswer(spark.table("t"), Row("1") :: Nil)
val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
spark.sql("create table t1 using parquet as select 2 as a")
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table1.location == makeQualifiedPath(t1Path.toString))
assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
checkAnswer(spark.table("t1"), Row(2) :: Nil)
versionSpark.range(1).write.saveAsTable("t")
assert(versionSpark.table("t").collect() === Array(Row(0)))
versionSpark.sql("create table t1 using parquet as select 2 as a")
assert(versionSpark.table("t1").collect() === Array(Row(2)))
}
}
// TODO: add more tests.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment