From 03cca5dce2cd7618b5c0e33163efb8502415b06e Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@databricks.com> Date: Wed, 30 Sep 2015 14:36:54 -0400 Subject: [PATCH] [SPARK-10770] [SQL] SparkPlan.executeCollect/executeTake should return InternalRow rather than external Row. Author: Reynold Xin <rxin@databricks.com> Closes #8900 from rxin/SPARK-10770-1. --- .../org/apache/spark/sql/DataFrame.scala | 2 +- .../spark/sql/execution/LocalTableScan.scala | 10 ++++----- .../spark/sql/execution/SparkPlan.scala | 22 +++++++++++-------- .../spark/sql/execution/basicOperators.scala | 7 +++--- .../apache/spark/sql/execution/commands.scala | 13 ++++++----- .../apache/spark/sql/execution/python.scala | 10 ++++----- .../spark/sql/execution/SparkPlanTest.scala | 2 +- .../apache/spark/sql/hive/HiveContext.scala | 6 ++--- .../hive/execution/InsertIntoHiveTable.scala | 6 ++--- 9 files changed, 39 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 9c67ad18c3..01f60aba87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1415,7 +1415,7 @@ class DataFrame private[sql]( * @since 1.3.0 */ def collect(): Array[Row] = withNewExecutionId { - queryExecution.executedPlan.executeCollect() + queryExecution.executedPlan.executeCollectPublic() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala index 34e926e458..adb6bbc4ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala @@ -34,13 +34,11 @@ private[sql] case class LocalTableScan( protected override def doExecute(): RDD[InternalRow] = rdd - override def executeCollect(): Array[Row] = { - val converter = CatalystTypeConverters.createToScalaConverter(schema) - rows.map(converter(_).asInstanceOf[Row]).toArray + override def executeCollect(): Array[InternalRow] = { + rows.toArray } - override def executeTake(limit: Int): Array[Row] = { - val converter = CatalystTypeConverters.createToScalaConverter(schema) - rows.map(converter(_).asInstanceOf[Row]).take(limit).toArray + override def executeTake(limit: Int): Array[InternalRow] = { + rows.take(limit).toArray } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 72f5450510..fcb42047ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -170,11 +170,16 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** * Runs this query returning the result as an array. */ - def executeCollect(): Array[Row] = { - execute().mapPartitions { iter => - val converter = CatalystTypeConverters.createToScalaConverter(schema) - iter.map(converter(_).asInstanceOf[Row]) - }.collect() + def executeCollect(): Array[InternalRow] = { + execute().map(_.copy()).collect() + } + + /** + * Runs this query returning the result as an array, using external Row format. + */ + def executeCollectPublic(): Array[Row] = { + val converter = CatalystTypeConverters.createToScalaConverter(schema) + executeCollect().map(converter(_).asInstanceOf[Row]) } /** @@ -182,9 +187,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * * This is modeled after RDD.take but never runs any job locally on the driver. */ - def executeTake(n: Int): Array[Row] = { + def executeTake(n: Int): Array[InternalRow] = { if (n == 0) { - return new Array[Row](0) + return new Array[InternalRow](0) } val childRDD = execute().map(_.copy()) @@ -218,8 +223,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ partsScanned += numPartsToTry } - val converter = CatalystTypeConverters.createToScalaConverter(schema) - buf.toArray.map(converter(_).asInstanceOf[Row]) + buf.toArray } private[this] def isTesting: Boolean = sys.props.contains("spark.testing") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index bf6d44c098..3e49e0a357 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -204,7 +204,7 @@ case class Limit(limit: Int, child: SparkPlan) override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition - override def executeCollect(): Array[Row] = child.executeTake(limit) + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) protected override def doExecute(): RDD[InternalRow] = { val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) { @@ -258,9 +258,8 @@ case class TakeOrderedAndProject( projection.map(data.map(_)).getOrElse(data) } - override def executeCollect(): Array[Row] = { - val converter = CatalystTypeConverters.createToScalaConverter(schema) - collectData().map(converter(_).asInstanceOf[Row]) + override def executeCollect(): Array[InternalRow] = { + collectData() } // TODO: Terminal split should be implemented differently from non-terminal split. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index af28e2dfa4..05ccc53830 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -54,20 +54,21 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan * The `execute()` method of all the physical command classes should reference `sideEffectResult` * so that the command can be executed eagerly right after the command query is created. */ - protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext) + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + cmd.run(sqlContext).map(converter(_).asInstanceOf[InternalRow]) + } override def output: Seq[Attribute] = cmd.output override def children: Seq[SparkPlan] = Nil - override def executeCollect(): Array[Row] = sideEffectResult.toArray + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray - override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray + override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray protected override def doExecute(): RDD[InternalRow] = { - val convert = CatalystTypeConverters.createToCatalystConverter(schema) - val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow]) - sqlContext.sparkContext.parallelize(converted, 1) + sqlContext.sparkContext.parallelize(sideEffectResult, 1) } override def argString: String = cmd.toString diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala index d6aaf424a8..5dbe0fc5f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala @@ -121,12 +121,10 @@ object EvaluatePython { def takeAndServe(df: DataFrame, n: Int): Int = { registerPicklers() - // This is an annoying hack - we should refactor the code so executeCollect and executeTake - // returns InternalRow rather than Row. - val converter = CatalystTypeConverters.createToCatalystConverter(df.schema) - val iter = new SerDeUtil.AutoBatchedPickler(df.take(n).iterator.map { row => - EvaluatePython.toJava(converter(row).asInstanceOf[InternalRow], df.schema) - }) + val iter = new SerDeUtil.AutoBatchedPickler( + df.queryExecution.executedPlan.executeTake(n).iterator.map { row => + EvaluatePython.toJava(row, df.schema) + }) PythonRDD.serveIterator(iter, s"serve-DataFrame") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala index 3d218f01c9..8549a6a0f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala @@ -245,7 +245,7 @@ object SparkPlanTest { } } ) - resolvedPlan.executeCollect().toSeq + resolvedPlan.executeCollectPublic().toSeq } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c75025e79a..17de8ef56f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -563,7 +563,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { /** Extends QueryExecution with hive specific features. */ protected[sql] class QueryExecution(logicalPlan: LogicalPlan) - extends super.QueryExecution(logicalPlan) { + extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) { /** * Returns the result as a hive compatible sequence of strings. For native commands, the @@ -581,10 +581,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { .mkString("\t") } case command: ExecutedCommand => - command.executeCollect().map(_(0).toString) + command.executeCollect().map(_.getString(0)) case other => - val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq + val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq // We need the types so we can output struct field names val types = analyzed.output.map(_.dataType) // Reformat to match hive tab delimited output. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 0c700bdb37..f936cf565b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -124,7 +124,7 @@ case class InsertIntoHiveTable( * * Note: this is run once and then kept to avoid double insertions. */ - protected[sql] lazy val sideEffectResult: Seq[Row] = { + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc @@ -267,10 +267,10 @@ case class InsertIntoHiveTable( // however for now we return an empty list to simplify compatibility checks with hive, which // does not return anything for insert operations. // TODO: implement hive compatibility as rules. - Seq.empty[Row] + Seq.empty[InternalRow] } - override def executeCollect(): Array[Row] = sideEffectResult.toArray + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1) -- GitLab