-
- Downloads
[SPARK-2094][SQL] "Exactly once" semantics for DDL and command statements
## Related JIRA issues - Main issue: - [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure exactly once semantics for DDL/Commands - Issues resolved as dependencies: - [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081): Undefine output() from the abstract class Command and implement it in concrete subclasses - [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan for DESCRIBE - [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852): SparkSQL Queries with Sorts run before the user asks them to - Other related issue: - [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE thrown while lookup a view Two test cases, `join_view` and `mergejoin_mixed`, within the `HiveCompatibilitySuite` are removed from the whitelist to workaround this issue. ## PR Overview This PR defines physical plans for DDL statements and commands and wraps their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that they are executed eagerly and exactly once. Also, as a positive side effect, now DDL statements and commands can be turned into proper `SchemaRDD`s and let user query the execution results. This PR defines schemas for the following DDL/commands: - EXPLAIN command - `plan`: String, the plan explanation - SET command - `key`: String, the key(s) of the propert(y/ies) being set or queried - `value`: String, the value(s) of the propert(y/ies) being queried - Other Hive native command - `result`: String, execution result returned by Hive **NOTE**: We should refine schemas for different native commands by defining physical plans for them in the future. ## Examples ### EXPLAIN command Take the "EXPLAIN" command as an example, we first execute the command and obtain a `SchemaRDD` at the same time, then query the `plan` field with the schema DSL: ``` scala> loadTestTable("src") ... scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key") ... q0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == ExplainCommandPhysical [plan#11:0] Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L] Exchange (HashPartitioning [key#4:0], 200) Exchange (HashPartitioning [key#4:0], 200) Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L] HiveTableScan [key#4], (MetastoreRelation default, src, None), None scala> q0.select('plan).collect() ... [ExplainCommandPhysical [plan#24:0] Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L] Exchange (HashPartitioning [key#17:0], 200) Exchange (HashPartitioning [key#17:0], 200) Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L] HiveTableScan [key#17], (MetastoreRelation default, src, None), None] scala> ``` ### SET command In this example we query all the properties set in `SQLConf`, register the result as a table, and then query the table with HiveQL: ``` scala> val q1 = hql("SET") ... q1: org.apache.spark.sql.SchemaRDD = SchemaRDD[7] at RDD at SchemaRDD.scala:98 == Query Plan == <SET command: executed by Hive, and noted by SQLContext> scala> q1.registerAsTable("properties") scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT 10").foreach(println) ... == Query Plan == TakeOrdered 10, [key#51:0 ASC] Project [key#51:0,value#52:1] SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no missing parents 14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98 == Query Plan == TakeOrdered 10, [key#51:0 ASC] Project [key#51:0,value#52:1] SetCommandPhysical None, None, [key#55:0,value#56:1]) ... [datanucleus.autoCreateSchema,true] [datanucleus.autoStartMechanismMode,checked] [datanucleus.cache.level2,false] [datanucleus.cache.level2.type,none] [datanucleus.connectionPoolingType,BONECP] [datanucleus.fixedDatastore,false] [datanucleus.identifierFactory,datanucleus1] [datanucleus.plugin.pluginRegistryBundleCheck,LOG] [datanucleus.rdbms.useLegacyNativeValueStrategy,true] [datanucleus.storeManagerType,rdbms] scala> ``` ### "Exactly once" semantics At last, an example of the "exactly once" semantics: ``` scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)") ... q2: org.apache.spark.sql.SchemaRDD = SchemaRDD[28] at RDD at SchemaRDD.scala:98 == Query Plan == <Native command: executed by Hive> scala> table("t1") ... res9: org.apache.spark.sql.SchemaRDD = SchemaRDD[32] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#58,value#59], (MetastoreRelation default, t1, None), None scala> q2.collect() ... res10: Array[org.apache.spark.sql.Row] = Array([]) scala> ``` As we can see, the "CREATE TABLE" command is executed eagerly right after the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger a duplicated execution. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1071 from liancheng/exactlyOnceCommand and squashes the following commits: d005b03 [Cheng Lian] Made "SET key=value" returns the newly set key value pair f6c7715 [Cheng Lian] Added test cases for DDL/command statement RDDs 1d00937 [Cheng Lian] Makes SchemaRDD DSLs work for DDL/command statement RDDs 5c7e680 [Cheng Lian] Bug fix: wrong type used in pattern matching 48aa2e5 [Cheng Lian] Refined SQLContext.emptyResult as an empty RDD[Row] cc64f32 [Cheng Lian] Renamed physical plan classes for DDL/commands 74789c1 [Cheng Lian] Fixed failing test cases 0ad343a [Cheng Lian] Added physical plan for DDL and commands to ensure the "exactly once" semantics
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala 10 additions, 8 deletions...rg/apache/spark/sql/catalyst/plans/logical/commands.scala
- sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala 1 addition, 1 deletion...he/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 5 additions, 34 deletions...core/src/main/scala/org/apache/spark/sql/SQLContext.scala
- sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala 1 addition, 1 deletionsql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
- sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala 13 additions, 2 deletions...e/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
- sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala 1 addition, 1 deletion...n/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 5 additions, 6 deletions...cala/org/apache/spark/sql/execution/SparkStrategies.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala 53 additions, 28 deletions.../main/scala/org/apache/spark/sql/execution/commands.scala
- sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 8 additions, 8 deletions...e/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 16 additions, 39 deletions...rc/main/scala/org/apache/spark/sql/hive/HiveContext.scala
- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 8 additions, 0 deletions...main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
- sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala 28 additions, 4 deletions...a/org/apache/spark/sql/hive/execution/hiveOperators.scala
- sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala 2 additions, 1 deletion.../apache/spark/sql/hive/execution/HiveComparisonTest.scala
- sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala 6 additions, 3 deletions...che/spark/sql/hive/execution/HiveCompatibilitySuite.scala
- sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala 94 additions, 31 deletions.../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
Loading
Please register or sign in to comment