Skip to content
Snippets Groups Projects
Commit c6e041d1 authored by Michael Armbrust's avatar Michael Armbrust Committed by Reynold Xin
Browse files

[SQL] Simple framework for debugging query execution

Only records number of tuples and unique dataTypes output right now...

Example:
```scala
scala> import org.apache.spark.sql.execution.debug._
scala> hql("SELECT value FROM src WHERE key > 10").debug(sparkContext)

Results returned: 489
== Project [value#1:0] ==
Tuples output: 489
 value StringType: {java.lang.String}
== Filter (key#0:1 > 10) ==
Tuples output: 489
 value StringType: {java.lang.String}
 key IntegerType: {java.lang.Integer}
== HiveTableScan [value#1,key#0], (MetastoreRelation default, src, None), None ==
Tuples output: 500
 value StringType: {java.lang.String}
 key IntegerType: {java.lang.Integer}
```

Author: Michael Armbrust <michael@databricks.com>

Closes #1005 from marmbrus/debug and squashes the following commits:

dcc3ca6 [Michael Armbrust] Add comments.
c9dded2 [Michael Armbrust] Simple framework for debugging query execution
parent e2734476
No related branches found
No related tags found
No related merge requests found
......@@ -285,11 +285,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
|== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim
/**
* Runs the query after interposing operators that print the result of each intermediate step.
*/
def debugExec() = DebugQuery(executedPlan).execute().collect()
}
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution
private[sql] object DebugQuery {
def apply(plan: SparkPlan): SparkPlan = {
val visited = new collection.mutable.HashSet[Long]()
plan transform {
case s: SparkPlan if !visited.contains(s.id) =>
visited += s.id
DebugNode(s)
}
}
}
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
def references = Set.empty
def output = child.output
def execute() = {
val childRdd = child.execute()
println(
s"""
|=========================
|${child.simpleString}
|=========================
""".stripMargin)
childRdd.foreach(println(_))
childRdd
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution
import scala.collection.mutable.HashSet
import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext._
import org.apache.spark.sql.{SchemaRDD, Row}
/**
* :: DeveloperApi ::
* Contains methods for debugging query execution.
*
* Usage:
* {{{
* sql("SELECT key FROM src").debug
* }}}
*/
package object debug {
/**
* :: DeveloperApi ::
* Augments SchemaRDDs with debug methods.
*/
@DeveloperApi
implicit class DebugQuery(query: SchemaRDD) {
def debug(implicit sc: SparkContext): Unit = {
val plan = query.queryExecution.executedPlan
val visited = new collection.mutable.HashSet[Long]()
val debugPlan = plan transform {
case s: SparkPlan if !visited.contains(s.id) =>
visited += s.id
DebugNode(sc, s)
}
println(s"Results returned: ${debugPlan.execute().count()}")
debugPlan.foreach {
case d: DebugNode => d.dumpStats()
case _ =>
}
}
}
private[sql] case class DebugNode(
@transient sparkContext: SparkContext,
child: SparkPlan) extends UnaryNode {
def references = Set.empty
def output = child.output
implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
def zero(initialValue: HashSet[String]): HashSet[String] = {
initialValue.clear()
initialValue
}
def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = {
v1 ++= v2
v1
}
}
/**
* A collection of stats for each column of output.
* @param elementTypes the actual runtime types for the output. Useful when there are bugs
* causing the wrong data to be projected.
*/
case class ColumnStat(
elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
val tupleCount = sparkContext.accumulator[Int](0)
val numColumns = child.output.size
val columnStats = Array.fill(child.output.size)(new ColumnStat())
def dumpStats(): Unit = {
println(s"== ${child.simpleString} ==")
println(s"Tuples output: ${tupleCount.value}")
child.output.zip(columnStats).foreach { case(attr, stat) =>
val actualDataTypes =stat.elementTypes.value.mkString("{", ",", "}")
println(s" ${attr.name} ${attr.dataType}: $actualDataTypes")
}
}
def execute() = {
child.execute().mapPartitions { iter =>
new Iterator[Row] {
def hasNext = iter.hasNext
def next() = {
val currentRow = iter.next()
tupleCount += 1
var i = 0
while (i < numColumns) {
val value = currentRow(i)
columnStats(i).elementTypes += HashSet(value.getClass.getName)
i += 1
}
currentRow
}
}
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment