diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 746bb55e14f22d56bc78f61f38eb09341cf41d75..78ab475eb466b20f9ec99f137c22a1556229e15c 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -213,7 +213,7 @@ class DataFrame(object): >>> df.explain() == Physical Plan == - Scan PhysicalRDD[age#0,name#1] + Scan ExistingRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan == diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 623348f6768a4aa25489f85f55933af47347a2f2..b8a43025882e5e83c4c123f3001daa629c8f956b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -97,22 +97,31 @@ private[sql] case class LogicalRDD( private[sql] case class PhysicalRDD( output: Seq[Attribute], rdd: RDD[InternalRow], - extraInformation: String, + override val nodeName: String, + override val metadata: Map[String, String] = Map.empty, override val outputsUnsafeRows: Boolean = false) extends LeafNode { protected override def doExecute(): RDD[InternalRow] = rdd - override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]") + override def simpleString: String = { + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" + s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" + } } private[sql] object PhysicalRDD { + // Metadata keys + val INPUT_PATHS = "InputPaths" + val PUSHED_FILTERS = "PushedFilters" + def createFromDataSource( output: Seq[Attribute], rdd: RDD[InternalRow], relation: BaseRelation, - extraInformation: String = ""): PhysicalRDD = { - PhysicalRDD(output, rdd, relation.toString + extraInformation, - relation.isInstanceOf[HadoopFsRelation]) + metadata: Map[String, String] = Map.empty): PhysicalRDD = { + // All HadoopFsRelations output UnsafeRows + val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation] + PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index a78177751c9dc0f659e1dde19856f50664346e95..ec98f81041343d730a81a0cec93a3cea994109bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -67,6 +67,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ super.makeCopy(newArgs) } + /** + * Return all metadata that describes more details of this SparkPlan. + */ + private[sql] def metadata: Map[String, String] = Map.empty + /** * Return all metrics containing metrics of this SparkPlan. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 486ce34064e431d1237b19203bdcd1e599a45278..4f750ad13ab844896857df611fd50f4a5a811b0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -30,6 +30,7 @@ class SparkPlanInfo( val nodeName: String, val simpleString: String, val children: Seq[SparkPlanInfo], + val metadata: Map[String, String], val metrics: Seq[SQLMetricInfo]) private[sql] object SparkPlanInfo { @@ -41,6 +42,6 @@ private[sql] object SparkPlanInfo { } val children = plan.children.map(fromSparkPlan) - new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics) + new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f67c951bc0663cd16d22de9275717307657155ad..25e98c0bdd4310828c4993437415a50719ec2956 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -363,7 +363,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil case e @ EvaluatePython(udf, child, _) => BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil - case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil + case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "ExistingRDD") :: Nil case BroadcastHint(child) => apply(child) case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 544d5eccec037828736e68a936c1468eb5f7159a..8a15a51d825ef5232c6daeec46a3d6f204949049 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala @@ -25,6 +27,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions} +import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} @@ -315,7 +318,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) - val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ") + val metadata: Map[String, String] = { + val pairs = ArrayBuffer.empty[(String, String)] + + if (pushedFilters.nonEmpty) { + pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]")) + } + + relation.relation match { + case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.paths.mkString(", ") + case _ => + } + + pairs.toMap + } if (projects.map(_.toAttribute) == projects && projectSet.size == projects.size && @@ -334,7 +350,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, pushedFiltersString) + relation.relation, metadata) filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -344,7 +360,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, pushedFiltersString) + relation.relation, metadata) execution.Project( projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index bb3e2786978c54e18855ff3bdbc7d3ddd31dde90..1af2a394f399a362b697756c1e4bb282d7e08d2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -146,6 +146,12 @@ private[sql] class ParquetRelation( meta } + override def toString: String = { + parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map { tableName => + s"${getClass.getSimpleName}: $tableName" + }.getOrElse(super.toString) + } + override def equals(other: Any): Boolean = other match { case that: ParquetRelation => val schemaEquality = if (shouldMergeSchemas) { @@ -521,6 +527,10 @@ private[sql] object ParquetRelation extends Logging { // internally. private[sql] val METASTORE_SCHEMA = "metastoreSchema" + // If a ParquetRelation is converted from a Hive metastore table, this option is set to the + // original Hive table name. + private[sql] val METASTORE_TABLE_NAME = "metastoreTableName" + /** * If parquet's block size (row group size) setting is larger than the min split size, * we use parquet's block size setting as the min split size. Otherwise, we will create diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 7af0ff09c5c6d35a54330eac41267ea046098703..3a6eff93998250c31d72d5068600e2c97e296be9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -66,7 +66,9 @@ private[sql] object SparkPlanGraph { SQLMetrics.getMetricParam(metric.metricParam)) } val node = SparkPlanGraphNode( - nodeIdGenerator.getAndIncrement(), planInfo.nodeName, planInfo.simpleString, metrics) + nodeIdGenerator.getAndIncrement(), planInfo.nodeName, + planInfo.simpleString, planInfo.metadata, metrics) + nodes += node val childrenNodes = planInfo.children.map( child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges)) @@ -85,26 +87,33 @@ private[sql] object SparkPlanGraph { * @param metrics metrics that this SparkPlan node will track */ private[ui] case class SparkPlanGraphNode( - id: Long, name: String, desc: String, metrics: Seq[SQLPlanMetric]) { + id: Long, + name: String, + desc: String, + metadata: Map[String, String], + metrics: Seq[SQLPlanMetric]) { def makeDotNode(metricsValue: Map[Long, String]): String = { - val values = { - for (metric <- metrics; - value <- metricsValue.get(metric.accumulatorId)) yield { - metric.name + ": " + value - } + val builder = new mutable.StringBuilder(name) + + val values = for { + metric <- metrics + value <- metricsValue.get(metric.accumulatorId) + } yield { + metric.name + ": " + value } - val label = if (values.isEmpty) { - name - } else { - // If there are metrics, display all metrics in a separate line. We should use an escaped - // "\n" here to follow the dot syntax. - // - // Note: whitespace between two "\n"s is to create an empty line between the name of - // SparkPlan and metrics. If removing it, it won't display the empty line in UI. - name + "\\n \\n" + values.mkString("\\n") - } - s""" $id [label="$label"];""" + + if (values.nonEmpty) { + // If there are metrics, display each entry in a separate line. We should use an escaped + // "\n" here to follow the dot syntax. + // + // Note: whitespace between two "\n"s is to create an empty line between the name of + // SparkPlan and metrics. If removing it, it won't display the empty line in UI. + builder ++= "\\n \\n" + builder ++= values.mkString("\\n") + } + + s""" $id [label="${builder.toString()}"];""" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 9ace25dc7d21b51c8e08f487a7faf1ea75230ffa..fc8ce6901dfca4089d43223383ca5307bbf4bdc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -422,7 +422,7 @@ abstract class HadoopFsRelation private[sql]( parameters: Map[String, String]) extends BaseRelation with FileRelation with Logging { - override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]") + override def toString: String = getClass.getSimpleName def this() = this(None, Map.empty[String, String]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index a4626259b28231a7eb283f8ceb12d085996ff320..2fb439f50117a658d2a79d20d4445147c9e3b19c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -169,7 +169,7 @@ class PlannerSuite extends SharedSQLContext { withTempTable("testPushed") { val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan - assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]")) + assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]")) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9a981d02ad67c4ca11ab907a6c17c864e303089e..08b291e088238f5382d1f33a511912fa0e29978e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -411,7 +411,12 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // evil case insensitivity issue, which is reconciled within `ParquetRelation`. val parquetOptions = Map( ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json, - ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString) + ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString, + ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier( + metastoreRelation.tableName, + Some(metastoreRelation.databaseName) + ).unquotedString + ) val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)