Skip to content
Snippets Groups Projects
Commit 6d0bfb96 authored by Reynold Xin's avatar Reynold Xin
Browse files

Small documentation and style fix.

parent 6cb8f836
No related branches found
No related tags found
No related merge requests found
...@@ -31,5 +31,6 @@ package org.apache.spark.sql.catalyst.plans.logical ...@@ -31,5 +31,6 @@ package org.apache.spark.sql.catalyst.plans.logical
* *
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`. * defaults to the product of children's `sizeInBytes`.
* @param isBroadcastable If true, output is small enough to be used in a broadcast join.
*/ */
private[sql] case class Statistics(sizeInBytes: BigInt, isBroadcastable: Boolean = false) case class Statistics(sizeInBytes: BigInt, isBroadcastable: Boolean = false)
...@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.joins ...@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.joins
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.{AccumulatorSuite, SparkConf, SparkContext} import org.apache.spark.{AccumulatorSuite, SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession} import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession}
import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.execution.exchange.EnsureRequirements
...@@ -68,10 +66,11 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { ...@@ -68,10 +66,11 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
} }
} }
private def testBroadcastJoin[T: ClassTag](joinType: String, private def testBroadcastJoin[T: ClassTag](
forceBroadcast: Boolean = false): SparkPlan = { joinType: String,
forceBroadcast: Boolean = false): SparkPlan = {
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
var df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value") val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
// Comparison at the end is for broadcast left semi join // Comparison at the end is for broadcast left semi join
val joinExpression = df1("key") === df2("key") && df1("value") > df2("value") val joinExpression = df1("key") === df2("key") && df1("value") > df2("value")
...@@ -80,11 +79,9 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { ...@@ -80,11 +79,9 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
} else { } else {
df1.join(df2, joinExpression, joinType) df1.join(df2, joinExpression, joinType)
} }
val plan = val plan = EnsureRequirements(spark.sessionState.conf).apply(df3.queryExecution.sparkPlan)
EnsureRequirements(spark.sessionState.conf).apply(df3.queryExecution.sparkPlan)
assert(plan.collect { case p: T => p }.size === 1) assert(plan.collect { case p: T => p }.size === 1)
plan
return plan
} }
test("unsafe broadcast hash join updates peak execution memory") { test("unsafe broadcast hash join updates peak execution memory") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment