diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0f3480c2391879e1f561dd8ca1454f7e5e849c31..47d5a6a43a84dbc3379d4bcf6e3922f6f561ebc7 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -212,8 +212,7 @@ class DataFrame(object): :param extended: boolean, default ``False``. If ``False``, prints only the physical plan. >>> df.explain() - PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at\ - NativeMethodAccessorImpl.java:... + Scan PhysicalRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan == @@ -224,7 +223,6 @@ class DataFrame(object): ... == Physical Plan == ... - == RDD == """ if extended: print(self._jdf.queryExecution().toString()) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 39f99700c8a26fb17b55215939be364fd17cb492..946c5a9c04f14d1e0afb4be8f38efd85a965717a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -107,6 +107,8 @@ object Cast { case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with CodegenFallback { + override def toString: String = s"cast($child as ${dataType.simpleString})" + override def checkInputDataTypes(): TypeCheckResult = { if (Cast.canCast(child.dataType, dataType)) { TypeCheckResult.TypeCheckSuccess @@ -118,8 +120,6 @@ case class Cast(child: Expression, dataType: DataType) override def nullable: Boolean = Cast.forceNullable(child.dataType, dataType) || child.nullable - override def toString: String = s"CAST($child, $dataType)" - // [[func]] assumes the input is no longer null because eval already does the null check. @inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 4abfdfe87d5e9733a6ef3c76dc5ea407894fb4e2..576d8c7a3a68acfdf285c340f8bd177ba3e19029 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -93,7 +93,7 @@ private[sql] case class AggregateExpression2( AttributeSet(childReferences) } - override def toString: String = s"(${aggregateFunction}2,mode=$mode,isDistinct=$isDistinct)" + override def toString: String = s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)" } abstract class AggregateFunction2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 075c0ea2544b255529dd39087a3e02052acfa90c..832572571cabdc63a3855463ac293fde1e3ae0d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1011,9 +1011,6 @@ class SQLContext(@transient val sparkContext: SparkContext) def output = analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") - // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) - // however, the `toRdd` will cause the real execution, which is not what we want. - // We need to think about how to avoid the side effect. s"""== Parsed Logical Plan == |${stringOrError(logical)} |== Analyzed Logical Plan == @@ -1024,7 +1021,6 @@ class SQLContext(@transient val sparkContext: SparkContext) |== Physical Plan == |${stringOrError(executedPlan)} |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} - |== RDD == """.stripMargin.trim } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index fbaa8e276ddb7d22abb9dca8e440db4e1895e1b0..cae7ca5cbdc888cbfc570f1a3db7752a65663bf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{Row, SQLContext} @@ -95,11 +96,23 @@ private[sql] case class LogicalRDD( /** Physical plan node for scanning data from an RDD. */ private[sql] case class PhysicalRDD( output: Seq[Attribute], - rdd: RDD[InternalRow]) extends LeafNode { + rdd: RDD[InternalRow], + extraInformation: String) extends LeafNode { override protected[sql] val trackNumOfRowsEnabled = true protected override def doExecute(): RDD[InternalRow] = rdd + + override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]") +} + +private[sql] object PhysicalRDD { + def createFromDataSource( + output: Seq[Attribute], + rdd: RDD[InternalRow], + relation: BaseRelation): PhysicalRDD = { + PhysicalRDD(output, rdd, relation.toString) + } } /** Logical plan node for scanning data from a local collection. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c5aaebe6732252b68e934fd6682df55b0fc40ed3..c4b9b5acea4de4f968355de2361364327e2f4171 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -363,12 +363,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Generate( generator, join = join, outer = outer, g.output, planLater(child)) :: Nil case logical.OneRowRelation => - execution.PhysicalRDD(Nil, singleRowRdd) :: Nil + execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil case logical.RepartitionByExpression(expressions, child) => execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil case e @ EvaluatePython(udf, child, _) => BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil - case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil + case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil case BroadcastHint(child) => apply(child) case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 5a0b4d47d62f8dc10c16e3a495911581dea3dbe0..c3dcbd2b71ee8d703a46a72c28c31ea7bfb9c2ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -93,10 +93,13 @@ case class TungstenAggregate( val allAggregateExpressions = nonCompleteAggregateExpressions ++ completeAggregateExpressions testFallbackStartsAt match { - case None => s"TungstenAggregate ${groupingExpressions} ${allAggregateExpressions}" + case None => + val keyString = groupingExpressions.mkString("[", ",", "]") + val valueString = allAggregateExpressions.mkString("[", ",", "]") + s"TungstenAggregate(key=$keyString, value=$valueString" case Some(fallbackStartsAt) => - s"TungstenAggregateWithControlledFallback ${groupingExpressions} " + - s"${allAggregateExpressions} fallbackStartsAt=$fallbackStartsAt" + s"TungstenAggregateWithControlledFallback $groupingExpressions " + + s"$allAggregateExpressions fallbackStartsAt=$fallbackStartsAt" } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e5dc676b878416c286f6f891b3adaa4e577e6929..5b5fa8c93ec5269b9635a220526cd8700e61484f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -101,8 +101,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil - case l @ LogicalRelation(t: TableScan) => - execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil + case l @ LogicalRelation(baseRelation: TableScan) => + execution.PhysicalRDD.createFromDataSource( + l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil case i @ logical.InsertIntoTable( l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty => @@ -169,7 +170,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) } - execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows) + execution.PhysicalRDD.createFromDataSource( + projections.map(_.toAttribute), + unionedRows, + logicalRelation.relation) } // TODO: refactor this thing. It is very complicated because it does projection internally. @@ -299,14 +303,18 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { projects.asInstanceOf[Seq[Attribute]] // Safe due to if above. .map(relation.attributeMap) // Match original case of attributes. - val scan = execution.PhysicalRDD(projects.map(_.toAttribute), - scanBuilder(requestedColumns, pushedFilters)) + val scan = execution.PhysicalRDD.createFromDataSource( + projects.map(_.toAttribute), + scanBuilder(requestedColumns, pushedFilters), + relation.relation) filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq - val scan = execution.PhysicalRDD(requestedColumns, - scanBuilder(requestedColumns, pushedFilters)) + val scan = execution.PhysicalRDD.createFromDataSource( + requestedColumns, + scanBuilder(requestedColumns, pushedFilters), + relation.relation) execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index c04557e5a0818416c198755f8455c4976aa57863..0b2929661b657a10ef7e24d3222c93d53c5255c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -383,7 +383,7 @@ private[sql] abstract class OutputWriterInternal extends OutputWriter { abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec]) extends BaseRelation with Logging { - logInfo("Constructing HadoopFsRelation") + override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]") def this() = this(None) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index 8208b25b5708cce034bb2c3c4aa5eac2dcd1afd0..322966f423784a01eba15e846ef63ea588529dbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -32,9 +32,9 @@ class RowFormatConvertersSuite extends SparkPlanTest { case c: ConvertToSafe => c } - private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null)) + private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null, "name")) assert(!outputsSafe.outputsUnsafeRows) - private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null)) + private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null, "name")) assert(outputsUnsafe.outputsUnsafeRows) test("planner should insert unsafe->safe conversions when required") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index 697211222b90cddda056bad47220bda4e5d7276f..8215dd6c2e7112af73d38039e034db66bbb42141 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -36,7 +36,7 @@ class HiveExplainSuite extends QueryTest { "== Analyzed Logical Plan ==", "== Optimized Logical Plan ==", "== Physical Plan ==", - "Code Generation", "== RDD ==") + "Code Generation") } test("explain create table command") {