Skip to content
Snippets Groups Projects
Commit 1b2785c3 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-18729][SS] Move DataFrame.collect out of synchronized block in MemorySink

## What changes were proposed in this pull request?

Move DataFrame.collect out of synchronized block so that we can query content in MemorySink when `DataFrame.collect` is running.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16162 from zsxwing/SPARK-18729.
parent 3ba69b64
No related branches found
No related tags found
No related merge requests found
......@@ -186,16 +186,23 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
}.mkString("\n")
}
override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
if (latestBatchId.isEmpty || batchId > latestBatchId.get) {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val notCommitted = synchronized {
latestBatchId.isEmpty || batchId > latestBatchId.get
}
if (notCommitted) {
logDebug(s"Committing batch $batchId to $this")
outputMode match {
case InternalOutputModes.Append | InternalOutputModes.Update =>
batches.append(AddedData(batchId, data.collect()))
val rows = AddedData(batchId, data.collect())
synchronized { batches += rows }
case InternalOutputModes.Complete =>
batches.clear()
batches += AddedData(batchId, data.collect())
val rows = AddedData(batchId, data.collect())
synchronized {
batches.clear()
batches += rows
}
case _ =>
throw new IllegalArgumentException(
......@@ -206,7 +213,7 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
}
}
def clear(): Unit = {
def clear(): Unit = synchronized {
batches.clear()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment