diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3c212d656e37111b0d0deaef37e9196877b62707..1b0462359607349b1a3e128bbade885e28184c1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.DataStreamWriter @@ -175,19 +175,13 @@ class Dataset[T] private[sql]( } @transient private[sql] val logicalPlan: LogicalPlan = { - def hasSideEffects(plan: LogicalPlan): Boolean = plan match { - case _: Command | - _: InsertIntoTable => true - case _ => false - } - + // For various commands (like DDL) and queries with side effects, we force query execution + // to happen right away to let these side effects take place eagerly. queryExecution.analyzed match { - // For various commands (like DDL) and queries with side effects, we force query execution - // to happen right away to let these side effects take place eagerly. - case p if hasSideEffects(p) => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession) - case Union(children) if children.forall(hasSideEffects) => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession) + case c: Command => + LocalRelation(c.output, queryExecution.executedPlan.executeCollect()) + case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => + LocalRelation(u.output, queryExecution.executedPlan.executeCollect()) case _ => queryExecution.analyzed } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 137f7ba04d57256439a017c8928a910299c3ee27..6ec2f4d8408629fdf8bdae3489f66c4c05fd3119 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { // SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp. case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] => command.executeCollect().map(_.getString(1)) - case command: ExecutedCommandExec => - command.executeCollect().map(_.getString(0)) case other => val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq // We need the types so we can output struct field names diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 027b1481af96b1478b2d2fbda14764c3181a3dca..20bf4925dbec58e6fdeaa5119aeea35346c43946 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SaveMode, Strategy} +import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming._ diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out index 59eb56920cdcff02948607bcc14c66c29665fc17..ba8bc936f0c7925a626e095e149b31315d14b2db 100644 --- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out @@ -196,7 +196,7 @@ SET spark.sql.caseSensitive=false -- !query 19 schema struct<key:string,value:string> -- !query 19 output -spark.sql.caseSensitive +spark.sql.caseSensitive false -- !query 20 @@ -212,7 +212,7 @@ SET spark.sql.caseSensitive=true -- !query 21 schema struct<key:string,value:string> -- !query 21 output -spark.sql.caseSensitive +spark.sql.caseSensitive true -- !query 22 diff --git a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out index c64520ff93c83653e607a8a5a3a47b6a72c744c9..c0930bbde69a46bc123ab233bd6d9c014baf4f54 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out @@ -177,7 +177,7 @@ set spark.sql.groupByOrdinal=false -- !query 17 schema struct<key:string,value:string> -- !query 17 output -spark.sql.groupByOrdinal +spark.sql.groupByOrdinal false -- !query 18 diff --git a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out index 03a4e72d0fa3e061033d98891e3a3a0cc204d75a..cc47cc67c87c8c7cbf85b63127fedb3e1fe25dc0 100644 --- a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out @@ -114,7 +114,7 @@ set spark.sql.orderByOrdinal=false -- !query 9 schema struct<key:string,value:string> -- !query 9 output -spark.sql.orderByOrdinal +spark.sql.orderByOrdinal false -- !query 10 diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out index cc50b9444bb4bdba3ae0366c405ca04a4d575d88..5db3bae5d0379cb6b926c3b98fb5250eee0af47d 100644 --- a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out @@ -63,7 +63,7 @@ set spark.sql.crossJoin.enabled = true -- !query 5 schema struct<key:string,value:string> -- !query 5 output -spark.sql.crossJoin.enabled +spark.sql.crossJoin.enabled true -- !query 6 @@ -85,4 +85,4 @@ set spark.sql.crossJoin.enabled = false -- !query 7 schema struct<key:string,value:string> -- !query 7 output -spark.sql.crossJoin.enabled +spark.sql.crossJoin.enabled false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 40d0ce0992170ac6f44297a655676844aad978a3..03cdfccdda5554794c55d0a184d739d24d2fb742 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql import java.io.File import java.math.MathContext import java.sql.Timestamp +import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.{AccumulatorSuite, SparkException} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} @@ -2564,4 +2566,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(sql(badQuery), Row(1) :: Nil) } + test("SPARK-19650: An action on a Command should not trigger a Spark job") { + // Create a listener that checks if new jobs have started. + val jobStarted = new AtomicBoolean(false) + val listener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStarted.set(true) + } + } + + // Make sure no spurious job starts are pending in the listener bus. + sparkContext.listenerBus.waitUntilEmpty(500) + sparkContext.addSparkListener(listener) + try { + // Execute the command. + sql("show databases").head() + + // Make sure we have seen all events triggered by DataFrame.show() + sparkContext.listenerBus.waitUntilEmpty(500) + } finally { + sparkContext.removeSparkListener(listener) + } + assert(!jobStarted.get(), "Command should not trigger a Spark job.") + } }