diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 8c984b36b79e50cde56f9c4949cf409fab185a24..4cfdf799f6f424206896f861b743142a230f1cce 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -492,7 +492,7 @@ class HiveContext(SQLContext): confusing error messages. """ jsc = sparkContext._jsc.sc() - jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc) + jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False) return cls(sparkContext, jtestHive) def refreshTable(self, tableName): diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index b45be0251d953abb97e10fd2f6fdc380914b6a56..7f892047c7075ab58aced8199d79cfb7b5756c86 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -73,8 +73,12 @@ class TestHiveContext( @transient override val sparkSession: TestHiveSparkSession) extends SQLContext(sparkSession) { - def this(sc: SparkContext) { - this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc))) + /** + * If loadTestTables is false, no test tables are loaded. Note that this flag can only be true + * when running in the JVM, i.e. it needs to be false when calling from Python. + */ + def this(sc: SparkContext, loadTestTables: Boolean = true) { + this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), loadTestTables)) } override def newSession(): TestHiveContext = { @@ -103,13 +107,24 @@ class TestHiveContext( } - +/** + * A [[SparkSession]] used in [[TestHiveContext]]. + * + * @param sc SparkContext + * @param warehousePath path to the Hive warehouse directory + * @param scratchDirPath scratch directory used by Hive's metastore client + * @param metastoreTemporaryConf configuration options for Hive's metastore + * @param existingSharedState optional [[TestHiveSharedState]] + * @param loadTestTables if true, load the test tables. They can only be loaded when running + * in the JVM, i.e when calling from Python this flag has to be false. + */ private[hive] class TestHiveSparkSession( @transient private val sc: SparkContext, val warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String], - @transient private val existingSharedState: Option[TestHiveSharedState]) + @transient private val existingSharedState: Option[TestHiveSharedState], + private val loadTestTables: Boolean) extends SparkSession(sc) with Logging { self => // TODO: We need to set the temp warehouse path to sc's conf. @@ -118,13 +133,14 @@ private[hive] class TestHiveSparkSession( // when we creating metadataHive. This flow is not easy to follow and can introduce // confusion when a developer is debugging an issue. We need to refactor this part // to just set the temp warehouse path in sc's conf. - def this(sc: SparkContext) { + def this(sc: SparkContext, loadTestTables: Boolean) { this( sc, Utils.createTempDir(namePrefix = "warehouse"), TestHiveContext.makeScratchDir(), HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false), - None) + None, + loadTestTables) } assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive") @@ -144,7 +160,7 @@ private[hive] class TestHiveSparkSession( override def newSession(): TestHiveSparkSession = { new TestHiveSparkSession( - sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState)) + sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState), loadTestTables) } private var cacheTables: Boolean = false @@ -204,165 +220,173 @@ private[hive] class TestHiveSparkSession( testTables += (testTable.name -> testTable) } - // The test tables that are defined in the Hive QTestUtil. - // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java - // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql - @transient - val hiveQTestUtilTables = Seq( - TestTable("src", - "CREATE TABLE src (key INT, value STRING)".cmd, - s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd), - TestTable("src1", - "CREATE TABLE src1 (key INT, value STRING)".cmd, - s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd), - TestTable("srcpart", () => { - sql( - "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)") - for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) { + if (loadTestTables) { + // The test tables that are defined in the Hive QTestUtil. + // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java + // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql + @transient + val hiveQTestUtilTables: Seq[TestTable] = Seq( + TestTable("src", + "CREATE TABLE src (key INT, value STRING)".cmd, + s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd), + TestTable("src1", + "CREATE TABLE src1 (key INT, value STRING)".cmd, + s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd), + TestTable("srcpart", () => { sql( - s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' - |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr') - """.stripMargin) - } - }), - TestTable("srcpart1", () => { - sql( - "CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)") - for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) { + "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)") + for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) { + sql( + s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' + |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr') + """.stripMargin) + } + }), + TestTable("srcpart1", () => { sql( - s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' - |OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr') - """.stripMargin) - } - }), - TestTable("src_thrift", () => { - import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer - import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat} - import org.apache.thrift.protocol.TBinaryProtocol + "CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)") + for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) { + sql( + s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' + |OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr') + """.stripMargin) + } + }), + TestTable("src_thrift", () => { + import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer + import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat} + import org.apache.thrift.protocol.TBinaryProtocol - sql( + sql( + s""" + |CREATE TABLE src_thrift(fake INT) + |ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}' + |WITH SERDEPROPERTIES( + | 'serialization.class'='org.apache.spark.sql.hive.test.Complex', + | 'serialization.format'='${classOf[TBinaryProtocol].getName}' + |) + |STORED AS + |INPUTFORMAT '${classOf[SequenceFileInputFormat[_, _]].getName}' + |OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_, _]].getName}' + """.stripMargin) + + sql( + s""" + |LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}' + |INTO TABLE src_thrift + """.stripMargin) + }), + TestTable("serdeins", + s"""CREATE TABLE serdeins (key INT, value STRING) + |ROW FORMAT SERDE '${classOf[LazySimpleSerDe].getCanonicalName}' + |WITH SERDEPROPERTIES ('field.delim'='\\t') + """.stripMargin.cmd, + "INSERT OVERWRITE TABLE serdeins SELECT * FROM src".cmd), + TestTable("episodes", + s"""CREATE TABLE episodes (title STRING, air_date STRING, doctor INT) + |STORED AS avro + |TBLPROPERTIES ( + | 'avro.schema.literal'='{ + | "type": "record", + | "name": "episodes", + | "namespace": "testing.hive.avro.serde", + | "fields": [ + | { + | "name": "title", + | "type": "string", + | "doc": "episode title" + | }, + | { + | "name": "air_date", + | "type": "string", + | "doc": "initial date" + | }, + | { + | "name": "doctor", + | "type": "int", + | "doc": "main actor playing the Doctor in episode" + | } + | ] + | }' + |) + """.stripMargin.cmd, s""" - |CREATE TABLE src_thrift(fake INT) - |ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}' - |WITH SERDEPROPERTIES( - | 'serialization.class'='org.apache.spark.sql.hive.test.Complex', - | 'serialization.format'='${classOf[TBinaryProtocol].getName}' - |) - |STORED AS - |INPUTFORMAT '${classOf[SequenceFileInputFormat[_, _]].getName}' - |OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_, _]].getName}' - """.stripMargin) - - sql( - s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}' INTO TABLE src_thrift") - }), - TestTable("serdeins", - s"""CREATE TABLE serdeins (key INT, value STRING) - |ROW FORMAT SERDE '${classOf[LazySimpleSerDe].getCanonicalName}' - |WITH SERDEPROPERTIES ('field.delim'='\\t') - """.stripMargin.cmd, - "INSERT OVERWRITE TABLE serdeins SELECT * FROM src".cmd), - TestTable("episodes", - s"""CREATE TABLE episodes (title STRING, air_date STRING, doctor INT) - |STORED AS avro - |TBLPROPERTIES ( - | 'avro.schema.literal'='{ - | "type": "record", - | "name": "episodes", - | "namespace": "testing.hive.avro.serde", - | "fields": [ - | { - | "name": "title", - | "type": "string", - | "doc": "episode title" - | }, - | { - | "name": "air_date", - | "type": "string", - | "doc": "initial date" - | }, - | { - | "name": "doctor", - | "type": "int", - | "doc": "main actor playing the Doctor in episode" - | } - | ] - | }' - |) - """.stripMargin.cmd, - s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}' INTO TABLE episodes".cmd - ), - // THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC - // PARTITIONING IS NOT YET SUPPORTED - TestTable("episodes_part", - s"""CREATE TABLE episodes_part (title STRING, air_date STRING, doctor INT) - |PARTITIONED BY (doctor_pt INT) - |STORED AS avro - |TBLPROPERTIES ( - | 'avro.schema.literal'='{ - | "type": "record", - | "name": "episodes", - | "namespace": "testing.hive.avro.serde", - | "fields": [ - | { - | "name": "title", - | "type": "string", - | "doc": "episode title" - | }, - | { - | "name": "air_date", - | "type": "string", - | "doc": "initial date" - | }, - | { - | "name": "doctor", - | "type": "int", - | "doc": "main actor playing the Doctor in episode" - | } - | ] - | }' - |) - """.stripMargin.cmd, - // WORKAROUND: Required to pass schema to SerDe for partitioned tables. - // TODO: Pass this automatically from the table to partitions. - s""" - |ALTER TABLE episodes_part SET SERDEPROPERTIES ( - | 'avro.schema.literal'='{ - | "type": "record", - | "name": "episodes", - | "namespace": "testing.hive.avro.serde", - | "fields": [ - | { - | "name": "title", - | "type": "string", - | "doc": "episode title" - | }, - | { - | "name": "air_date", - | "type": "string", - | "doc": "initial date" - | }, - | { - | "name": "doctor", - | "type": "int", - | "doc": "main actor playing the Doctor in episode" - | } - | ] - | }' - |) - """.stripMargin.cmd, - s""" - INSERT OVERWRITE TABLE episodes_part PARTITION (doctor_pt=1) - SELECT title, air_date, doctor FROM episodes - """.cmd + |LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}' + |INTO TABLE episodes + """.stripMargin.cmd ), - TestTable("src_json", - s"""CREATE TABLE src_json (json STRING) STORED AS TEXTFILE - """.stripMargin.cmd, - s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/json.txt")}' INTO TABLE src_json".cmd) - ) + // THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC + // PARTITIONING IS NOT YET SUPPORTED + TestTable("episodes_part", + s"""CREATE TABLE episodes_part (title STRING, air_date STRING, doctor INT) + |PARTITIONED BY (doctor_pt INT) + |STORED AS avro + |TBLPROPERTIES ( + | 'avro.schema.literal'='{ + | "type": "record", + | "name": "episodes", + | "namespace": "testing.hive.avro.serde", + | "fields": [ + | { + | "name": "title", + | "type": "string", + | "doc": "episode title" + | }, + | { + | "name": "air_date", + | "type": "string", + | "doc": "initial date" + | }, + | { + | "name": "doctor", + | "type": "int", + | "doc": "main actor playing the Doctor in episode" + | } + | ] + | }' + |) + """.stripMargin.cmd, + // WORKAROUND: Required to pass schema to SerDe for partitioned tables. + // TODO: Pass this automatically from the table to partitions. + s""" + |ALTER TABLE episodes_part SET SERDEPROPERTIES ( + | 'avro.schema.literal'='{ + | "type": "record", + | "name": "episodes", + | "namespace": "testing.hive.avro.serde", + | "fields": [ + | { + | "name": "title", + | "type": "string", + | "doc": "episode title" + | }, + | { + | "name": "air_date", + | "type": "string", + | "doc": "initial date" + | }, + | { + | "name": "doctor", + | "type": "int", + | "doc": "main actor playing the Doctor in episode" + | } + | ] + | }' + |) + """.stripMargin.cmd, + s""" + INSERT OVERWRITE TABLE episodes_part PARTITION (doctor_pt=1) + SELECT title, air_date, doctor FROM episodes + """.cmd + ), + TestTable("src_json", + s"""CREATE TABLE src_json (json STRING) STORED AS TEXTFILE + """.stripMargin.cmd, + s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/json.txt")}' INTO TABLE src_json".cmd) + ) - hiveQTestUtilTables.foreach(registerTestTable) + hiveQTestUtilTables.foreach(registerTestTable) + } private val loadedTables = new collection.mutable.HashSet[String]