Skip to content
Snippets Groups Projects
Commit 273afcb2 authored by Cheng Lian's avatar Cheng Lian Committed by Michael Armbrust
Browse files

[SQL][SPARK-2094] Follow up of PR #1071 for Java API

Updated `JavaSQLContext` and `JavaHiveContext` similar to what we've done to `SQLContext` and `HiveContext` in PR #1071. Added corresponding test case for Spark SQL Java API.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1085 from liancheng/spark-2094-java and squashes the following commits:

29b8a51 [Cheng Lian] Avoided instantiating JavaSparkContext & JavaHiveContext to workaround test failure
92bb4fb [Cheng Lian] Marked test cases in JavaHiveQLSuite with "ignore"
22aec97 [Cheng Lian] Follow up of PR #1071 for Java API
parent cdf2b045
No related branches found
No related tags found
No related merge requests found
......@@ -40,19 +40,13 @@ class JavaSQLContext(val sqlContext: SQLContext) {
/**
* Executes a query expressed in SQL, returning the result as a JavaSchemaRDD
*/
def sql(sqlQuery: String): JavaSchemaRDD = {
val result = new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def sql(sqlQuery: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
* a table. This registered table can be used as the target of future insertInto` operations.
* a table. This registered table can be used as the target of future `insertInto` operations.
*
* {{{
* JavaSQLContext sqlCtx = new JavaSQLContext(...)
......@@ -62,7 +56,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* }}}
*
* @param beanClass A java bean class object that will be used to determine the schema of the
* parquet file. s
* parquet file.
* @param path The path where the directory containing parquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
......@@ -100,14 +94,12 @@ class JavaSQLContext(val sqlContext: SQLContext) {
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
}
/**
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
*/
def parquetFile(path: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
......
......@@ -31,12 +31,6 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa
/**
* Executes a query expressed in HiveQL, returning the result as a JavaSchemaRDD.
*/
def hql(hqlQuery: String): JavaSchemaRDD = {
val result = new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def hql(hqlQuery: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
}
......@@ -17,25 +17,85 @@
package org.apache.spark.sql.hive.api.java
import scala.util.Try
import org.scalatest.FunSuite
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.test.TestSQLContext
// Implicits
import scala.collection.JavaConversions._
class JavaHiveSQLSuite extends FunSuite {
class JavaHiveQLSuite extends FunSuite {
lazy val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)
// There is a little trickery here to avoid instantiating two HiveContexts in the same JVM
lazy val javaHiveCtx = new JavaHiveContext(javaCtx) {
override val sqlContext = TestHive
}
ignore("SELECT * FROM src") {
val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)
// There is a little trickery here to avoid instantiating two HiveContexts in the same JVM
val javaSqlCtx = new JavaHiveContext(javaCtx) {
override val sqlContext = TestHive
assert(
javaHiveCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) ===
TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
}
private val explainCommandClassName =
classOf[ExplainCommand].getSimpleName.stripSuffix("$")
def isExplanation(result: JavaSchemaRDD) = {
val explanation = result.collect().map(_.getString(0))
explanation.size == 1 && explanation.head.startsWith(explainCommandClassName)
}
ignore("Query Hive native command execution result") {
val tableName = "test_native_commands"
assertResult(0) {
javaHiveCtx.hql(s"DROP TABLE IF EXISTS $tableName").count()
}
assertResult(0) {
javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
}
javaHiveCtx.hql("SHOW TABLES").registerAsTable("show_tables")
assert(
javaSqlCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) ===
TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
javaHiveCtx
.hql("SELECT result FROM show_tables")
.collect()
.map(_.getString(0))
.contains(tableName))
assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
javaHiveCtx.hql(s"DESCRIBE $tableName").registerAsTable("describe_table")
javaHiveCtx
.hql("SELECT result FROM describe_table")
.collect()
.map(_.getString(0).split("\t").map(_.trim))
.toArray
}
assert(isExplanation(javaHiveCtx.hql(
s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))
TestHive.reset()
}
ignore("Exactly once semantics for DDL and command statements") {
val tableName = "test_exactly_once"
val q0 = javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)")
// If the table was not created, the following assertion would fail
assert(Try(TestHive.table(tableName)).isSuccess)
// If the CREATE TABLE command got executed again, the following assertion would fail
assert(Try(q0.count()).isSuccess)
}
}
......@@ -184,25 +184,29 @@ class HiveQuerySuite extends HiveComparisonTest {
test("Query Hive native command execution result") {
val tableName = "test_native_commands"
val q0 = hql(s"DROP TABLE IF EXISTS $tableName")
assert(q0.count() == 0)
assertResult(0) {
hql(s"DROP TABLE IF EXISTS $tableName").count()
}
val q1 = hql(s"CREATE TABLE $tableName(key INT, value STRING)")
assert(q1.count() == 0)
assertResult(0) {
hql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
}
val q2 = hql("SHOW TABLES")
val tables = q2.select('result).collect().map { case Row(table: String) => table }
assert(tables.contains(tableName))
assert(
hql("SHOW TABLES")
.select('result)
.collect()
.map(_.getString(0))
.contains(tableName))
val q3 = hql(s"DESCRIBE $tableName")
assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
q3.select('result).collect().map { case Row(fieldDesc: String) =>
fieldDesc.split("\t").map(_.trim)
}
hql(s"DESCRIBE $tableName")
.select('result)
.collect()
.map(_.getString(0).split("\t").map(_.trim))
}
val q4 = hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")
assert(isExplanation(q4))
assert(isExplanation(hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))
TestHive.reset()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment