Skip to content
Snippets Groups Projects
Commit c1a0c66b authored by Liwei Lin's avatar Liwei Lin Committed by Reynold Xin
Browse files

[SPARK-18261][STRUCTURED STREAMING] Add statistics to MemorySink for joining

## What changes were proposed in this pull request?

Right now, there is no way to join the output of a memory sink with any table:

> UnsupportedOperationException: LeafNode MemoryPlan must implement statistics

This patch adds statistics to MemorySink, making joining snapshots of memory streams with tables possible.

## How was this patch tested?

Added a test case.

Author: Liwei Lin <lwlin7@gmail.com>

Closes #15786 from lw-lin/memory-sink-stat.
parent 9b0593d5
No related branches found
No related tags found
No related merge requests found
......@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
......@@ -212,4 +212,8 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
*/
case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode {
def this(sink: MemorySink) = this(sink, sink.schema.toAttributes)
private val sizePerRow = sink.schema.toAttributes.map(_.dataType.defaultSize).sum
override def statistics: Statistics = Statistics(sizePerRow * sink.allData.size)
}
......@@ -187,6 +187,22 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
query.stop()
}
test("MemoryPlan statistics") {
implicit val schema = new StructType().add(new StructField("value", IntegerType))
val sink = new MemorySink(schema, InternalOutputModes.Append)
val plan = new MemoryPlan(sink)
// Before adding data, check output
checkAnswer(sink.allData, Seq.empty)
assert(plan.statistics.sizeInBytes === 0)
sink.addBatch(0, 1 to 3)
assert(plan.statistics.sizeInBytes === 12)
sink.addBatch(1, 4 to 6)
assert(plan.statistics.sizeInBytes === 24)
}
ignore("stress test") {
// Ignore the stress test as it takes several minutes to run
(0 until 1000).foreach { _ =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment