From 7d2a7a91f263bb9fbf24dc4dbffde8fe5e2c7442 Mon Sep 17 00:00:00 2001 From: Michael Armbrust <michael@databricks.com> Date: Wed, 27 Aug 2014 15:14:08 -0700 Subject: [PATCH] [SPARK-3235][SQL] Ensure in-memory tables don't always broadcast. Author: Michael Armbrust <michael@databricks.com> Closes #2147 from marmbrus/inMemDefaultSize and squashes the following commits: 5390360 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into inMemDefaultSize 14204d3 [Michael Armbrust] Set the context before creating SparkLogicalPlans. 8da4414 [Michael Armbrust] Make sure we throw errors when leaf nodes fail to provide statistcs 18ce029 [Michael Armbrust] Ensure in-memory tables don't always broadcast. --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 14 ++++++++------ .../scala/org/apache/spark/sql/SQLContext.scala | 4 +++- .../sql/columnar/InMemoryColumnarTableScan.scala | 3 +++ .../org/apache/spark/sql/execution/SparkPlan.scala | 2 +- .../sql/columnar/InMemoryColumnarQuerySuite.scala | 8 ++++++++ 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 8616ac45b0..f81d911194 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -41,9 +41,14 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { case class Statistics( sizeInBytes: BigInt ) - lazy val statistics: Statistics = Statistics( - sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product - ) + lazy val statistics: Statistics = { + if (children.size == 0) { + throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.") + } + + Statistics( + sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product) + } /** * Returns the set of attributes that this node takes as @@ -117,9 +122,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { */ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { self: Product => - - override lazy val statistics: Statistics = - throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6f0eed3f63..a75af94d29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -89,8 +89,10 @@ class SQLContext(@transient val sparkContext: SparkContext) * * @group userf */ - implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = + implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = { + SparkPlan.currentContext.set(self) new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self)) + } /** * :: DeveloperApi :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 24e88eea31..bc36bacd00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -39,6 +39,9 @@ private[sql] case class InMemoryRelation( (private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null) extends LogicalPlan with MultiInstanceRelation { + override lazy val statistics = + Statistics(sizeInBytes = child.sqlContext.defaultSizeInBytes) + // If the cached column buffers were not passed in, we calculate them in the constructor. // As in Spark, the actual work of caching is lazy. if (_cachedColumnBuffers == null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 7d33ea5b02..2b8913985b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * populated by the query planning infrastructure. */ @transient - protected val sqlContext = SparkPlan.currentContext.get() + protected[spark] val sqlContext = SparkPlan.currentContext.get() protected def sparkContext = sqlContext.sparkContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index 736c0f8571..fdd2799a53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -33,6 +33,14 @@ class InMemoryColumnarQuerySuite extends QueryTest { checkAnswer(scan, testData.collect().toSeq) } + test("default size avoids broadcast") { + // TODO: Improve this test when we have better statistics + sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).registerTempTable("sizeTst") + cacheTable("sizeTst") + assert( + table("sizeTst").queryExecution.logical.statistics.sizeInBytes > autoBroadcastJoinThreshold) + } + test("projection") { val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan val scan = InMemoryRelation(useCompression = true, 5, plan) -- GitLab