Skip to content
Snippets Groups Projects
Commit 95f4fbae authored by Liwei Lin's avatar Liwei Lin Committed by Shixiong Zhu
Browse files

[SPARK-14942][SQL][STREAMING] Reduce delay between batch construction and execution

## Problem

Currently in `StreamExecution`, [we first run the batch, then construct the next](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L165):
```scala
if (dataAvailable) runBatch()
constructNextBatch()
```

This is good when we run batches ASAP, where data would get processed in the **very next batch**:

![1](https://cloud.githubusercontent.com/assets/15843379/14779964/2786e698-0b0d-11e6-9d2c-bb41513488b2.png)

However, when we run batches at trigger like `ProcessTime("1 minute")`, data - such as _y_ below - may not get processed in the very next batch i.e. _batch 1_, but in _batch 2_:

![2](https://cloud.githubusercontent.com/assets/15843379/14779818/6f3bb064-0b0c-11e6-9f16-c1ce4897186b.png)

## What changes were proposed in this pull request?

This patch reverses the order of `constructNextBatch()` and `runBatch()`. After this patch, data would get processed in the **very next batch**, i.e. _batch 1_:

![3](https://cloud.githubusercontent.com/assets/15843379/14779816/6f36ee62-0b0c-11e6-9e53-bc8397fade18.png)

In addition, this patch alters when we do `currentBatchId += 1`: let's do that when the processing of the current batch's data is completed, so we won't bother passing `currentBatchId + 1` or  `currentBatchId - 1` to states or sinks.

## How was this patch tested?

New added test case. Also this should be covered by existing test suits, e.g. stress tests and others.

Author: Liwei Lin <lwlin7@gmail.com>

Closes #12725 from lw-lin/construct-before-run-3.
parent fabc8e5b
No related branches found
No related tags found
No related merge requests found
......@@ -27,12 +27,12 @@ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner,
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
* plan incrementally. Possibly preserving state in between each execution.
*/
class IncrementalExecution(
class IncrementalExecution private[sql](
sparkSession: SparkSession,
logicalPlan: LogicalPlan,
outputMode: OutputMode,
checkpointLocation: String,
currentBatchId: Long)
val currentBatchId: Long)
extends QueryExecution(sparkSession, logicalPlan) {
// TODO: make this always part of planning.
......@@ -57,7 +57,7 @@ class IncrementalExecution(
case StateStoreSaveExec(keys, None,
UnaryExecNode(agg,
StateStoreRestoreExec(keys2, None, child))) =>
val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId - 1)
val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId)
operatorId += 1
StateStoreSaveExec(
......
......@@ -122,7 +122,7 @@ class StreamExecution(
* processing is done. Thus, the Nth record in this log indicated data that is currently being
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
*/
private val offsetLog =
private[sql] val offsetLog =
new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets"))
/** Whether the query is currently active or not */
......@@ -174,12 +174,21 @@ class StreamExecution(
// While active, repeatedly attempt to run batches.
SQLContext.setActive(sparkSession.wrapped)
populateStartOffsets()
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
triggerExecutor.execute(() => {
if (isActive) {
if (dataAvailable) runBatch()
constructNextBatch()
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets()
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
}
if (dataAvailable) {
runBatch()
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
}
true
} else {
false
......@@ -214,7 +223,7 @@ class StreamExecution(
offsetLog.getLatest() match {
case Some((batchId, nextOffsets)) =>
logInfo(s"Resuming continuous query, starting with batch $batchId")
currentBatchId = batchId + 1
currentBatchId = batchId
availableOffsets = nextOffsets.toStreamProgress(sources)
logDebug(s"Found possibly uncommitted offsets $availableOffsets")
......@@ -285,7 +294,6 @@ class StreamExecution(
offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
}
currentBatchId += 1
logInfo(s"Committed offsets for batch $currentBatchId.")
} else {
awaitBatchLock.lock()
......@@ -352,7 +360,7 @@ class StreamExecution(
val nextBatch =
new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
sink.addBatch(currentBatchId - 1, nextBatch)
sink.addBatch(currentBatchId, nextBatch)
awaitBatchLock.lock()
try {
......
......@@ -124,6 +124,10 @@ class MemorySink(val schema: StructType) extends Sink with Logging {
batches.flatten
}
def latestBatchId: Option[Int] = synchronized {
if (batches.size == 0) None else Some(batches.size - 1)
}
def lastBatch: Seq[Row] = synchronized { batches.last }
def toDebugString: String = synchronized {
......
......@@ -137,20 +137,88 @@ class StreamSuite extends StreamTest with SharedSQLContext {
}
}
// This would fail for now -- error is "Timed out waiting for stream"
// Root cause is that data generated in batch 0 may not get processed in batch 1
// Let's enable this after SPARK-14942: Reduce delay between batch construction and execution
ignore("minimize delay between batch construction and execution") {
test("minimize delay between batch construction and execution") {
// For each batch, we would retrieve new data's offsets and log them before we run the execution
// This checks whether the key of the offset log is the expected batch id
def CheckOffsetLogLatestBatchId(expectedId: Int): AssertOnQuery =
AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
s"offsetLog's latest should be $expectedId")
// For each batch, we would log the state change during the execution
// This checks whether the key of the state change log is the expected batch id
def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery =
AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId == expectedId,
s"lastExecution's currentBatchId should be $expectedId")
// For each batch, we would log the sink change after the execution
// This checks whether the key of the sink change log is the expected batch id
def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery =
AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == expectedId,
s"sink's lastBatchId should be $expectedId")
val inputData = MemoryStream[Int]
testStream(inputData.toDS())(
StartStream(ProcessingTime("10 seconds"), new ManualClock),
/* -- batch 0 ----------------------- */
AddData(inputData, 1),
AddData(inputData, 2),
AddData(inputData, 3),
// Add some data in batch 0
AddData(inputData, 1, 2, 3),
AdvanceManualClock(10 * 1000), // 10 seconds
/* -- batch 1 ----------------------- */
CheckAnswer(1, 2, 3))
// Check the results of batch 0
CheckAnswer(1, 2, 3),
CheckIncrementalExecutionCurrentBatchId(0),
CheckOffsetLogLatestBatchId(0),
CheckSinkLatestBatchId(0),
// Add some data in batch 1
AddData(inputData, 4, 5, 6),
AdvanceManualClock(10 * 1000),
/* -- batch _ ----------------------- */
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
AdvanceManualClock(10 * 1000),
AdvanceManualClock(10 * 1000),
AdvanceManualClock(10 * 1000),
/* -- batch __ ---------------------- */
// Check the results of batch 1 again; this is to make sure that, when there's no new data,
// the currentId does not get logged (e.g. as 2) even if the clock has advanced many times
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
/* Stop then restart the Stream */
StopStream,
StartStream(ProcessingTime("10 seconds"), new ManualClock),
/* -- batch 1 rerun ----------------- */
// this batch 1 would re-run because the latest batch id logged in offset log is 1
AdvanceManualClock(10 * 1000),
/* -- batch 2 ----------------------- */
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
// Add some data in batch 2
AddData(inputData, 7, 8, 9),
AdvanceManualClock(10 * 1000),
/* -- batch 3 ----------------------- */
// Check the results of batch 2
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9),
CheckIncrementalExecutionCurrentBatchId(2),
CheckOffsetLogLatestBatchId(2),
CheckSinkLatestBatchId(2))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment