diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 9c67ad18c3d276f6af43bffff3c639f4387b651e..01f60aba87ede17843520ac141fa458b1978506a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1415,7 +1415,7 @@ class DataFrame private[sql]( * @since 1.3.0 */ def collect(): Array[Row] = withNewExecutionId { - queryExecution.executedPlan.executeCollect() + queryExecution.executedPlan.executeCollectPublic() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala index 34e926e4582be2ff25c082d0adc462291b15bb72..adb6bbc4acc5bb4fea34a84b953d4f738d189ade 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala @@ -34,13 +34,11 @@ private[sql] case class LocalTableScan( protected override def doExecute(): RDD[InternalRow] = rdd - override def executeCollect(): Array[Row] = { - val converter = CatalystTypeConverters.createToScalaConverter(schema) - rows.map(converter(_).asInstanceOf[Row]).toArray + override def executeCollect(): Array[InternalRow] = { + rows.toArray } - override def executeTake(limit: Int): Array[Row] = { - val converter = CatalystTypeConverters.createToScalaConverter(schema) - rows.map(converter(_).asInstanceOf[Row]).take(limit).toArray + override def executeTake(limit: Int): Array[InternalRow] = { + rows.take(limit).toArray } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 72f5450510a104704d4c4846dc736fee5c860c26..fcb42047ffe60f859aa29982680a9ec72c01d0fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -170,11 +170,16 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** * Runs this query returning the result as an array. */ - def executeCollect(): Array[Row] = { - execute().mapPartitions { iter => - val converter = CatalystTypeConverters.createToScalaConverter(schema) - iter.map(converter(_).asInstanceOf[Row]) - }.collect() + def executeCollect(): Array[InternalRow] = { + execute().map(_.copy()).collect() + } + + /** + * Runs this query returning the result as an array, using external Row format. + */ + def executeCollectPublic(): Array[Row] = { + val converter = CatalystTypeConverters.createToScalaConverter(schema) + executeCollect().map(converter(_).asInstanceOf[Row]) } /** @@ -182,9 +187,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * * This is modeled after RDD.take but never runs any job locally on the driver. */ - def executeTake(n: Int): Array[Row] = { + def executeTake(n: Int): Array[InternalRow] = { if (n == 0) { - return new Array[Row](0) + return new Array[InternalRow](0) } val childRDD = execute().map(_.copy()) @@ -218,8 +223,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ partsScanned += numPartsToTry } - val converter = CatalystTypeConverters.createToScalaConverter(schema) - buf.toArray.map(converter(_).asInstanceOf[Row]) + buf.toArray } private[this] def isTesting: Boolean = sys.props.contains("spark.testing") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index bf6d44c098ee3c8637beb28ebd6b54eeb1654dfa..3e49e0a35741d6eb087bee78cbe96f6b5e4ae155 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -204,7 +204,7 @@ case class Limit(limit: Int, child: SparkPlan) override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition - override def executeCollect(): Array[Row] = child.executeTake(limit) + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) protected override def doExecute(): RDD[InternalRow] = { val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) { @@ -258,9 +258,8 @@ case class TakeOrderedAndProject( projection.map(data.map(_)).getOrElse(data) } - override def executeCollect(): Array[Row] = { - val converter = CatalystTypeConverters.createToScalaConverter(schema) - collectData().map(converter(_).asInstanceOf[Row]) + override def executeCollect(): Array[InternalRow] = { + collectData() } // TODO: Terminal split should be implemented differently from non-terminal split. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index af28e2dfa4186e3c2508cb08cba5d03ac8273487..05ccc53830bd152c47476b4cbb48b6474ff6b4dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -54,20 +54,21 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan * The `execute()` method of all the physical command classes should reference `sideEffectResult` * so that the command can be executed eagerly right after the command query is created. */ - protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext) + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + cmd.run(sqlContext).map(converter(_).asInstanceOf[InternalRow]) + } override def output: Seq[Attribute] = cmd.output override def children: Seq[SparkPlan] = Nil - override def executeCollect(): Array[Row] = sideEffectResult.toArray + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray - override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray + override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray protected override def doExecute(): RDD[InternalRow] = { - val convert = CatalystTypeConverters.createToCatalystConverter(schema) - val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow]) - sqlContext.sparkContext.parallelize(converted, 1) + sqlContext.sparkContext.parallelize(sideEffectResult, 1) } override def argString: String = cmd.toString diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala index d6aaf424a89359aed2673187f325b01808da44b2..5dbe0fc5f95c761ccb5d10b112889ae4ac6e93c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala @@ -121,12 +121,10 @@ object EvaluatePython { def takeAndServe(df: DataFrame, n: Int): Int = { registerPicklers() - // This is an annoying hack - we should refactor the code so executeCollect and executeTake - // returns InternalRow rather than Row. - val converter = CatalystTypeConverters.createToCatalystConverter(df.schema) - val iter = new SerDeUtil.AutoBatchedPickler(df.take(n).iterator.map { row => - EvaluatePython.toJava(converter(row).asInstanceOf[InternalRow], df.schema) - }) + val iter = new SerDeUtil.AutoBatchedPickler( + df.queryExecution.executedPlan.executeTake(n).iterator.map { row => + EvaluatePython.toJava(row, df.schema) + }) PythonRDD.serveIterator(iter, s"serve-DataFrame") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala index 3d218f01c9eadf9c6f83f94e9d191af6d2f58249..8549a6a0f664319d6045f77e3b37f6700fc793e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala @@ -245,7 +245,7 @@ object SparkPlanTest { } } ) - resolvedPlan.executeCollect().toSeq + resolvedPlan.executeCollectPublic().toSeq } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c75025e79af4a0dfa0e6baf89282947d38b5c92f..17de8ef56f9a68772f28dc5b9b4293c39f998850 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -563,7 +563,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { /** Extends QueryExecution with hive specific features. */ protected[sql] class QueryExecution(logicalPlan: LogicalPlan) - extends super.QueryExecution(logicalPlan) { + extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) { /** * Returns the result as a hive compatible sequence of strings. For native commands, the @@ -581,10 +581,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { .mkString("\t") } case command: ExecutedCommand => - command.executeCollect().map(_(0).toString) + command.executeCollect().map(_.getString(0)) case other => - val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq + val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq // We need the types so we can output struct field names val types = analyzed.output.map(_.dataType) // Reformat to match hive tab delimited output. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 0c700bdb370ac2ee7b6c19f4d4ade953a0c2e62e..f936cf565b2bc317d1f58d4aa98fbaedea71ca27 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -124,7 +124,7 @@ case class InsertIntoHiveTable( * * Note: this is run once and then kept to avoid double insertions. */ - protected[sql] lazy val sideEffectResult: Seq[Row] = { + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc @@ -267,10 +267,10 @@ case class InsertIntoHiveTable( // however for now we return an empty list to simplify compatibility checks with hive, which // does not return anything for insert operations. // TODO: implement hive compatibility as rules. - Seq.empty[Row] + Seq.empty[InternalRow] } - override def executeCollect(): Array[Row] = sideEffectResult.toArray + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1)