Skip to content
Snippets Groups Projects
Commit 38f4d6f4 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-15954][SQL] Disable loading test tables in Python tests

## What changes were proposed in this pull request?
This patch introduces a flag to disable loading test tables in TestHiveSparkSession and disables that in Python. This fixes an issue in which python/run-tests would fail due to failure to load test tables.

Note that these test tables are not used outside of HiveCompatibilitySuite. In the long run we should probably decouple the loading of test tables from the test Hive setup.

## How was this patch tested?
This is a test only change.

Author: Reynold Xin <rxin@databricks.com>

Closes #14005 from rxin/SPARK-15954.
parent 4a981dc8
No related branches found
No related tags found
No related merge requests found
...@@ -492,7 +492,7 @@ class HiveContext(SQLContext): ...@@ -492,7 +492,7 @@ class HiveContext(SQLContext):
confusing error messages. confusing error messages.
""" """
jsc = sparkContext._jsc.sc() jsc = sparkContext._jsc.sc()
jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc) jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False)
return cls(sparkContext, jtestHive) return cls(sparkContext, jtestHive)
def refreshTable(self, tableName): def refreshTable(self, tableName):
......
...@@ -73,8 +73,12 @@ class TestHiveContext( ...@@ -73,8 +73,12 @@ class TestHiveContext(
@transient override val sparkSession: TestHiveSparkSession) @transient override val sparkSession: TestHiveSparkSession)
extends SQLContext(sparkSession) { extends SQLContext(sparkSession) {
def this(sc: SparkContext) { /**
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc))) * If loadTestTables is false, no test tables are loaded. Note that this flag can only be true
* when running in the JVM, i.e. it needs to be false when calling from Python.
*/
def this(sc: SparkContext, loadTestTables: Boolean = true) {
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), loadTestTables))
} }
override def newSession(): TestHiveContext = { override def newSession(): TestHiveContext = {
...@@ -103,13 +107,24 @@ class TestHiveContext( ...@@ -103,13 +107,24 @@ class TestHiveContext(
} }
/**
* A [[SparkSession]] used in [[TestHiveContext]].
*
* @param sc SparkContext
* @param warehousePath path to the Hive warehouse directory
* @param scratchDirPath scratch directory used by Hive's metastore client
* @param metastoreTemporaryConf configuration options for Hive's metastore
* @param existingSharedState optional [[TestHiveSharedState]]
* @param loadTestTables if true, load the test tables. They can only be loaded when running
* in the JVM, i.e when calling from Python this flag has to be false.
*/
private[hive] class TestHiveSparkSession( private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext, @transient private val sc: SparkContext,
val warehousePath: File, val warehousePath: File,
scratchDirPath: File, scratchDirPath: File,
metastoreTemporaryConf: Map[String, String], metastoreTemporaryConf: Map[String, String],
@transient private val existingSharedState: Option[TestHiveSharedState]) @transient private val existingSharedState: Option[TestHiveSharedState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self => extends SparkSession(sc) with Logging { self =>
// TODO: We need to set the temp warehouse path to sc's conf. // TODO: We need to set the temp warehouse path to sc's conf.
...@@ -118,13 +133,14 @@ private[hive] class TestHiveSparkSession( ...@@ -118,13 +133,14 @@ private[hive] class TestHiveSparkSession(
// when we creating metadataHive. This flow is not easy to follow and can introduce // when we creating metadataHive. This flow is not easy to follow and can introduce
// confusion when a developer is debugging an issue. We need to refactor this part // confusion when a developer is debugging an issue. We need to refactor this part
// to just set the temp warehouse path in sc's conf. // to just set the temp warehouse path in sc's conf.
def this(sc: SparkContext) { def this(sc: SparkContext, loadTestTables: Boolean) {
this( this(
sc, sc,
Utils.createTempDir(namePrefix = "warehouse"), Utils.createTempDir(namePrefix = "warehouse"),
TestHiveContext.makeScratchDir(), TestHiveContext.makeScratchDir(),
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false), HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false),
None) None,
loadTestTables)
} }
assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive") assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")
...@@ -144,7 +160,7 @@ private[hive] class TestHiveSparkSession( ...@@ -144,7 +160,7 @@ private[hive] class TestHiveSparkSession(
override def newSession(): TestHiveSparkSession = { override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession( new TestHiveSparkSession(
sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState)) sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState), loadTestTables)
} }
private var cacheTables: Boolean = false private var cacheTables: Boolean = false
...@@ -204,165 +220,173 @@ private[hive] class TestHiveSparkSession( ...@@ -204,165 +220,173 @@ private[hive] class TestHiveSparkSession(
testTables += (testTable.name -> testTable) testTables += (testTable.name -> testTable)
} }
// The test tables that are defined in the Hive QTestUtil. if (loadTestTables) {
// /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java // The test tables that are defined in the Hive QTestUtil.
// https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@transient // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql
val hiveQTestUtilTables = Seq( @transient
TestTable("src", val hiveQTestUtilTables: Seq[TestTable] = Seq(
"CREATE TABLE src (key INT, value STRING)".cmd, TestTable("src",
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd), "CREATE TABLE src (key INT, value STRING)".cmd,
TestTable("src1", s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),
"CREATE TABLE src1 (key INT, value STRING)".cmd, TestTable("src1",
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd), "CREATE TABLE src1 (key INT, value STRING)".cmd,
TestTable("srcpart", () => { s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
sql( TestTable("srcpart", () => {
"CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
sql( sql(
s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
|OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr') for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
""".stripMargin) sql(
} s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
}), |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')
TestTable("srcpart1", () => { """.stripMargin)
sql( }
"CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)") }),
for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) { TestTable("srcpart1", () => {
sql( sql(
s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' "CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)")
|OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr') for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) {
""".stripMargin) sql(
} s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
}), |OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr')
TestTable("src_thrift", () => { """.stripMargin)
import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer }
import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat} }),
import org.apache.thrift.protocol.TBinaryProtocol TestTable("src_thrift", () => {
import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer
import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat}
import org.apache.thrift.protocol.TBinaryProtocol
sql( sql(
s"""
|CREATE TABLE src_thrift(fake INT)
|ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}'
|WITH SERDEPROPERTIES(
| 'serialization.class'='org.apache.spark.sql.hive.test.Complex',
| 'serialization.format'='${classOf[TBinaryProtocol].getName}'
|)
|STORED AS
|INPUTFORMAT '${classOf[SequenceFileInputFormat[_, _]].getName}'
|OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_, _]].getName}'
""".stripMargin)
sql(
s"""
|LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}'
|INTO TABLE src_thrift
""".stripMargin)
}),
TestTable("serdeins",
s"""CREATE TABLE serdeins (key INT, value STRING)
|ROW FORMAT SERDE '${classOf[LazySimpleSerDe].getCanonicalName}'
|WITH SERDEPROPERTIES ('field.delim'='\\t')
""".stripMargin.cmd,
"INSERT OVERWRITE TABLE serdeins SELECT * FROM src".cmd),
TestTable("episodes",
s"""CREATE TABLE episodes (title STRING, air_date STRING, doctor INT)
|STORED AS avro
|TBLPROPERTIES (
| 'avro.schema.literal'='{
| "type": "record",
| "name": "episodes",
| "namespace": "testing.hive.avro.serde",
| "fields": [
| {
| "name": "title",
| "type": "string",
| "doc": "episode title"
| },
| {
| "name": "air_date",
| "type": "string",
| "doc": "initial date"
| },
| {
| "name": "doctor",
| "type": "int",
| "doc": "main actor playing the Doctor in episode"
| }
| ]
| }'
|)
""".stripMargin.cmd,
s""" s"""
|CREATE TABLE src_thrift(fake INT) |LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}'
|ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}' |INTO TABLE episodes
|WITH SERDEPROPERTIES( """.stripMargin.cmd
| 'serialization.class'='org.apache.spark.sql.hive.test.Complex',
| 'serialization.format'='${classOf[TBinaryProtocol].getName}'
|)
|STORED AS
|INPUTFORMAT '${classOf[SequenceFileInputFormat[_, _]].getName}'
|OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_, _]].getName}'
""".stripMargin)
sql(
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}' INTO TABLE src_thrift")
}),
TestTable("serdeins",
s"""CREATE TABLE serdeins (key INT, value STRING)
|ROW FORMAT SERDE '${classOf[LazySimpleSerDe].getCanonicalName}'
|WITH SERDEPROPERTIES ('field.delim'='\\t')
""".stripMargin.cmd,
"INSERT OVERWRITE TABLE serdeins SELECT * FROM src".cmd),
TestTable("episodes",
s"""CREATE TABLE episodes (title STRING, air_date STRING, doctor INT)
|STORED AS avro
|TBLPROPERTIES (
| 'avro.schema.literal'='{
| "type": "record",
| "name": "episodes",
| "namespace": "testing.hive.avro.serde",
| "fields": [
| {
| "name": "title",
| "type": "string",
| "doc": "episode title"
| },
| {
| "name": "air_date",
| "type": "string",
| "doc": "initial date"
| },
| {
| "name": "doctor",
| "type": "int",
| "doc": "main actor playing the Doctor in episode"
| }
| ]
| }'
|)
""".stripMargin.cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}' INTO TABLE episodes".cmd
),
// THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC
// PARTITIONING IS NOT YET SUPPORTED
TestTable("episodes_part",
s"""CREATE TABLE episodes_part (title STRING, air_date STRING, doctor INT)
|PARTITIONED BY (doctor_pt INT)
|STORED AS avro
|TBLPROPERTIES (
| 'avro.schema.literal'='{
| "type": "record",
| "name": "episodes",
| "namespace": "testing.hive.avro.serde",
| "fields": [
| {
| "name": "title",
| "type": "string",
| "doc": "episode title"
| },
| {
| "name": "air_date",
| "type": "string",
| "doc": "initial date"
| },
| {
| "name": "doctor",
| "type": "int",
| "doc": "main actor playing the Doctor in episode"
| }
| ]
| }'
|)
""".stripMargin.cmd,
// WORKAROUND: Required to pass schema to SerDe for partitioned tables.
// TODO: Pass this automatically from the table to partitions.
s"""
|ALTER TABLE episodes_part SET SERDEPROPERTIES (
| 'avro.schema.literal'='{
| "type": "record",
| "name": "episodes",
| "namespace": "testing.hive.avro.serde",
| "fields": [
| {
| "name": "title",
| "type": "string",
| "doc": "episode title"
| },
| {
| "name": "air_date",
| "type": "string",
| "doc": "initial date"
| },
| {
| "name": "doctor",
| "type": "int",
| "doc": "main actor playing the Doctor in episode"
| }
| ]
| }'
|)
""".stripMargin.cmd,
s"""
INSERT OVERWRITE TABLE episodes_part PARTITION (doctor_pt=1)
SELECT title, air_date, doctor FROM episodes
""".cmd
), ),
TestTable("src_json", // THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC
s"""CREATE TABLE src_json (json STRING) STORED AS TEXTFILE // PARTITIONING IS NOT YET SUPPORTED
""".stripMargin.cmd, TestTable("episodes_part",
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/json.txt")}' INTO TABLE src_json".cmd) s"""CREATE TABLE episodes_part (title STRING, air_date STRING, doctor INT)
) |PARTITIONED BY (doctor_pt INT)
|STORED AS avro
|TBLPROPERTIES (
| 'avro.schema.literal'='{
| "type": "record",
| "name": "episodes",
| "namespace": "testing.hive.avro.serde",
| "fields": [
| {
| "name": "title",
| "type": "string",
| "doc": "episode title"
| },
| {
| "name": "air_date",
| "type": "string",
| "doc": "initial date"
| },
| {
| "name": "doctor",
| "type": "int",
| "doc": "main actor playing the Doctor in episode"
| }
| ]
| }'
|)
""".stripMargin.cmd,
// WORKAROUND: Required to pass schema to SerDe for partitioned tables.
// TODO: Pass this automatically from the table to partitions.
s"""
|ALTER TABLE episodes_part SET SERDEPROPERTIES (
| 'avro.schema.literal'='{
| "type": "record",
| "name": "episodes",
| "namespace": "testing.hive.avro.serde",
| "fields": [
| {
| "name": "title",
| "type": "string",
| "doc": "episode title"
| },
| {
| "name": "air_date",
| "type": "string",
| "doc": "initial date"
| },
| {
| "name": "doctor",
| "type": "int",
| "doc": "main actor playing the Doctor in episode"
| }
| ]
| }'
|)
""".stripMargin.cmd,
s"""
INSERT OVERWRITE TABLE episodes_part PARTITION (doctor_pt=1)
SELECT title, air_date, doctor FROM episodes
""".cmd
),
TestTable("src_json",
s"""CREATE TABLE src_json (json STRING) STORED AS TEXTFILE
""".stripMargin.cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/json.txt")}' INTO TABLE src_json".cmd)
)
hiveQTestUtilTables.foreach(registerTestTable) hiveQTestUtilTables.foreach(registerTestTable)
}
private val loadedTables = new collection.mutable.HashSet[String] private val loadedTables = new collection.mutable.HashSet[String]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment