Skip to content
Snippets Groups Projects
Commit c1e87e38 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-20030][SS] Event-time-based timeout for MapGroupsWithState

## What changes were proposed in this pull request?

Adding event time based timeout. The user sets the timeout timestamp directly using `KeyedState.setTimeoutTimestamp`. The keys times out when the watermark crosses the timeout timestamp.

## How was this patch tested?
Unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17361 from tdas/SPARK-20030.
parent 2d73fcce
No related merge requests found
Showing
with 616 additions and 268 deletions
......@@ -19,9 +19,7 @@ package org.apache.spark.sql.streaming;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.catalyst.plans.logical.NoTimeout$;
import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout;
import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$;
import org.apache.spark.sql.catalyst.plans.logical.*;
/**
* Represents the type of timeouts possible for the Dataset operations
......@@ -34,9 +32,23 @@ import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$;
@InterfaceStability.Evolving
public class KeyedStateTimeout {
/** Timeout based on processing time. */
/**
* Timeout based on processing time. The duration of timeout can be set for each group in
* `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutDuration()`. See documentation
* on `KeyedState` for more details.
*/
public static KeyedStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
/** No timeout */
/**
* Timeout based on event-time. The event-time timestamp for timeout can be set for each
* group in `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutTimestamp()`.
* In addition, you have to define the watermark in the query using `Dataset.withWatermark`.
* When the watermark advances beyond the set timestamp of a group and the group has not
* received any data, then the group times out. See documentation on
* `KeyedState` for more details.
*/
public static KeyedStateTimeout EventTimeTimeout() { return EventTimeTimeout$.MODULE$; }
/** No timeout. */
public static KeyedStateTimeout NoTimeout() { return NoTimeout$.MODULE$; }
}
......@@ -147,49 +147,69 @@ object UnsupportedOperationChecker {
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets")
// mapGroupsWithState: Allowed only when no aggregation + Update output mode
case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
if (collectStreamingAggregates(plan).isEmpty) {
if (outputMode != InternalOutputModes.Update) {
throwError("mapGroupsWithState is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
} else {
// Allowed when no aggregation + Update output mode
}
} else {
throwError("mapGroupsWithState is not supported with aggregation " +
"on a streaming DataFrame/Dataset")
}
// flatMapGroupsWithState without aggregation
case m: FlatMapGroupsWithState
if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
m.outputMode match {
case InternalOutputModes.Update =>
if (outputMode != InternalOutputModes.Update) {
throwError("flatMapGroupsWithState in update mode is not supported with " +
// mapGroupsWithState and flatMapGroupsWithState
case m: FlatMapGroupsWithState if m.isStreaming =>
// Check compatibility with output modes and aggregations in query
val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
if (m.isMapGroupsWithState) { // check mapGroupsWithState
// allowed only in update query output mode and without aggregation
if (aggsAfterFlatMapGroups.nonEmpty) {
throwError(
"mapGroupsWithState is not supported with aggregation " +
"on a streaming DataFrame/Dataset")
} else if (outputMode != InternalOutputModes.Update) {
throwError(
"mapGroupsWithState is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
}
} else { // check latMapGroupsWithState
if (aggsAfterFlatMapGroups.isEmpty) {
// flatMapGroupsWithState without aggregation: operation's output mode must
// match query output mode
m.outputMode match {
case InternalOutputModes.Update if outputMode != InternalOutputModes.Update =>
throwError(
"flatMapGroupsWithState in update mode is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
case InternalOutputModes.Append if outputMode != InternalOutputModes.Append =>
throwError(
"flatMapGroupsWithState in append mode is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
case _ =>
}
case InternalOutputModes.Append =>
if (outputMode != InternalOutputModes.Append) {
throwError("flatMapGroupsWithState in append mode is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
} else {
// flatMapGroupsWithState with aggregation: update operation mode not allowed, and
// *groupsWithState after aggregation not allowed
if (m.outputMode == InternalOutputModes.Update) {
throwError(
"flatMapGroupsWithState in update mode is not supported with " +
"aggregation on a streaming DataFrame/Dataset")
} else if (collectStreamingAggregates(m).nonEmpty) {
throwError(
"flatMapGroupsWithState in append mode is not supported after " +
s"aggregation on a streaming DataFrame/Dataset")
}
}
}
// flatMapGroupsWithState(Update) with aggregation
case m: FlatMapGroupsWithState
if m.isStreaming && m.outputMode == InternalOutputModes.Update
&& collectStreamingAggregates(plan).nonEmpty =>
throwError("flatMapGroupsWithState in update mode is not supported with " +
"aggregation on a streaming DataFrame/Dataset")
// flatMapGroupsWithState(Append) with aggregation
case m: FlatMapGroupsWithState
if m.isStreaming && m.outputMode == InternalOutputModes.Append
&& collectStreamingAggregates(m).nonEmpty =>
throwError("flatMapGroupsWithState in append mode is not supported after " +
s"aggregation on a streaming DataFrame/Dataset")
// Check compatibility with timeout configs
if (m.timeout == EventTimeTimeout) {
// With event time timeout, watermark must be defined.
val watermarkAttributes = m.child.output.collect {
case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
}
if (watermarkAttributes.isEmpty) {
throwError(
"Watermark must be specified in the query using " +
"'[Dataset/DataFrame].withWatermark()' for using event-time timeout in a " +
"[map|flatMap]GroupsWithState. Event-time timeout not supported without " +
"watermark.")(plan)
}
}
case d: Deduplicate if collectStreamingAggregates(d).nonEmpty =>
throwError("dropDuplicates is not supported after aggregation on a " +
......
......@@ -353,9 +353,10 @@ case class MapGroups(
/** Internal class representing State */
trait LogicalKeyedState[S]
/** Possible types of timeouts used in FlatMapGroupsWithState */
/** Types of timeouts used in FlatMapGroupsWithState */
case object NoTimeout extends KeyedStateTimeout
case object ProcessingTimeTimeout extends KeyedStateTimeout
case object EventTimeTimeout extends KeyedStateTimeout
/** Factory for constructing new `MapGroupsWithState` nodes. */
object FlatMapGroupsWithState {
......
......@@ -345,6 +345,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append,
expectedMsgs = Seq("Mixing mapGroupsWithStates and flatMapGroupsWithStates"))
// mapGroupsWithState with event time timeout + watermark
assertNotSupportedInStreamingPlan(
"mapGroupsWithState - mapGroupsWithState with event time timeout without watermark",
FlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true,
EventTimeTimeout, streamRelation),
outputMode = Update,
expectedMsgs = Seq("watermark"))
assertSupportedInStreamingPlan(
"mapGroupsWithState - mapGroupsWithState with event time timeout with watermark",
FlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true,
EventTimeTimeout, new TestStreamingRelation(attributeWithWatermark)),
outputMode = Update)
// Deduplicate
assertSupportedInStreamingPlan(
"Deduplicate - Deduplicate on streaming relation before aggregation",
......
......@@ -336,8 +336,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
timeout, child) =>
val execPlan = FlatMapGroupsWithStateExec(
func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, None, stateEnc, outputMode,
timeout, batchTimestampMs = KeyedStateImpl.NO_BATCH_PROCESSING_TIMESTAMP,
planLater(child))
timeout, batchTimestampMs = None, eventTimeWatermark = None, planLater(child))
execPlan :: Nil
case _ =>
Nil
......
......@@ -19,13 +19,14 @@ package org.apache.spark.sql.execution.streaming
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, Literal, SortOrder, SpecificInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalKeyedState, ProcessingTimeTimeout}
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, Literal, SortOrder, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.streaming.KeyedStateImpl.NO_TIMESTAMP
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode}
import org.apache.spark.sql.types.{BooleanType, IntegerType}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.util.CompletionIterator
/**
......@@ -39,7 +40,7 @@ import org.apache.spark.util.CompletionIterator
* @param outputObjAttr used to define the output object
* @param stateEncoder used to serialize/deserialize state before calling `func`
* @param outputMode the output mode of `func`
* @param timeout used to timeout groups that have not received data in a while
* @param timeoutConf used to timeout groups that have not received data in a while
* @param batchTimestampMs processing timestamp of the current batch.
*/
case class FlatMapGroupsWithStateExec(
......@@ -52,11 +53,15 @@ case class FlatMapGroupsWithStateExec(
stateId: Option[OperatorStateId],
stateEncoder: ExpressionEncoder[Any],
outputMode: OutputMode,
timeout: KeyedStateTimeout,
batchTimestampMs: Long,
child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter {
timeoutConf: KeyedStateTimeout,
batchTimestampMs: Option[Long],
override val eventTimeWatermark: Option[Long],
child: SparkPlan
) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter with WatermarkSupport {
private val isTimeoutEnabled = timeout == ProcessingTimeTimeout
import KeyedStateImpl._
private val isTimeoutEnabled = timeoutConf != NoTimeout
private val timestampTimeoutAttribute =
AttributeReference("timeoutTimestamp", dataType = IntegerType, nullable = false)()
private val stateAttributes: Seq[Attribute] = {
......@@ -64,8 +69,6 @@ case class FlatMapGroupsWithStateExec(
if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
}
import KeyedStateImpl._
/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(groupingAttributes) :: Nil
......@@ -74,9 +77,21 @@ case class FlatMapGroupsWithStateExec(
override def requiredChildOrdering: Seq[Seq[SortOrder]] =
Seq(groupingAttributes.map(SortOrder(_, Ascending)))
override def keyExpressions: Seq[Attribute] = groupingAttributes
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
// Throw errors early if parameters are not as expected
timeoutConf match {
case ProcessingTimeTimeout =>
require(batchTimestampMs.nonEmpty)
case EventTimeTimeout =>
require(eventTimeWatermark.nonEmpty) // watermark value has been populated
require(watermarkExpression.nonEmpty) // input schema has watermark attribute
case _ =>
}
child.execute().mapPartitionsWithStateStore[InternalRow](
getStateId.checkpointLocation,
getStateId.operatorId,
......@@ -84,15 +99,23 @@ case class FlatMapGroupsWithStateExec(
groupingAttributes.toStructType,
stateAttributes.toStructType,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iterator) =>
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val updater = new StateStoreUpdater(store)
// If timeout is based on event time, then filter late data based on watermark
val filteredIter = watermarkPredicateForData match {
case Some(predicate) if timeoutConf == EventTimeTimeout =>
iter.filter(row => !predicate.eval(row))
case None =>
iter
}
// Generate a iterator that returns the rows grouped by the grouping function
// Note that this code ensures that the filtering for timeout occurs only after
// all the data has been processed. This is to ensure that the timeout information of all
// the keys with data is updated before they are processed for timeouts.
val outputIterator =
updater.updateStateForKeysWithData(iterator) ++ updater.updateStateForTimedOutKeys()
updater.updateStateForKeysWithData(filteredIter) ++ updater.updateStateForTimedOutKeys()
// Return an iterator of all the rows generated by all the keys, such that when fully
// consumed, all the state updates will be committed by the state store
......@@ -124,7 +147,7 @@ case class FlatMapGroupsWithStateExec(
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
encoderSerializer :+ Literal(KeyedStateImpl.TIMEOUT_TIMESTAMP_NOT_SET)
encoderSerializer :+ Literal(KeyedStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
......@@ -157,16 +180,19 @@ case class FlatMapGroupsWithStateExec(
/** Find the groups that have timeout set and are timing out right now, and call the function */
def updateStateForTimedOutKeys(): Iterator[InternalRow] = {
if (isTimeoutEnabled) {
val timeoutThreshold = timeoutConf match {
case ProcessingTimeTimeout => batchTimestampMs.get
case EventTimeTimeout => eventTimeWatermark.get
case _ =>
throw new IllegalStateException(
s"Cannot filter timed out keys for $timeoutConf")
}
val timingOutKeys = store.filter { case (_, stateRow) =>
val timeoutTimestamp = getTimeoutTimestamp(stateRow)
timeoutTimestamp != TIMEOUT_TIMESTAMP_NOT_SET && timeoutTimestamp < batchTimestampMs
timeoutTimestamp != NO_TIMESTAMP && timeoutTimestamp < timeoutThreshold
}
timingOutKeys.flatMap { case (keyRow, stateRow) =>
callFunctionAndUpdateState(
keyRow,
Iterator.empty,
Some(stateRow),
hasTimedOut = true)
callFunctionAndUpdateState(keyRow, Iterator.empty, Some(stateRow), hasTimedOut = true)
}
} else Iterator.empty
}
......@@ -186,7 +212,11 @@ case class FlatMapGroupsWithStateExec(
val valueObjIter = valueRowIter.map(getValueObj.apply) // convert value rows to objects
val stateObjOption = getStateObj(prevStateRowOption)
val keyedState = new KeyedStateImpl(
stateObjOption, batchTimestampMs, isTimeoutEnabled, hasTimedOut)
stateObjOption,
batchTimestampMs.getOrElse(NO_TIMESTAMP),
eventTimeWatermark.getOrElse(NO_TIMESTAMP),
timeoutConf,
hasTimedOut)
// Call function, get the returned objects and convert them to rows
val mappedIterator = func(keyObj, valueObjIter, keyedState).map { obj =>
......@@ -196,8 +226,6 @@ case class FlatMapGroupsWithStateExec(
// When the iterator is consumed, then write changes to state
def onIteratorCompletion: Unit = {
// Has the timeout information changed
if (keyedState.hasRemoved) {
store.remove(keyRow)
numUpdatedStateRows += 1
......@@ -205,26 +233,25 @@ case class FlatMapGroupsWithStateExec(
} else {
val previousTimeoutTimestamp = prevStateRowOption match {
case Some(row) => getTimeoutTimestamp(row)
case None => TIMEOUT_TIMESTAMP_NOT_SET
case None => NO_TIMESTAMP
}
val currentTimeoutTimestamp = keyedState.getTimeoutTimestamp
val stateRowToWrite = if (keyedState.hasUpdated) {
getStateRow(keyedState.get)
} else {
prevStateRowOption.orNull
}
val hasTimeoutChanged = keyedState.getTimeoutTimestamp != previousTimeoutTimestamp
val hasTimeoutChanged = currentTimeoutTimestamp != previousTimeoutTimestamp
val shouldWriteState = keyedState.hasUpdated || hasTimeoutChanged
if (shouldWriteState) {
if (stateRowToWrite == null) {
// This should never happen because checks in KeyedStateImpl should avoid cases
// where empty state would need to be written
throw new IllegalStateException(
"Attempting to write empty state")
throw new IllegalStateException("Attempting to write empty state")
}
setTimeoutTimestamp(stateRowToWrite, keyedState.getTimeoutTimestamp)
setTimeoutTimestamp(stateRowToWrite, currentTimeoutTimestamp)
store.put(keyRow.copy(), stateRowToWrite.copy())
numUpdatedStateRows += 1
}
......@@ -247,7 +274,7 @@ case class FlatMapGroupsWithStateExec(
/** Returns the timeout timestamp of a state row is set */
def getTimeoutTimestamp(stateRow: UnsafeRow): Long = {
if (isTimeoutEnabled) stateRow.getLong(timeoutTimestampIndex) else TIMEOUT_TIMESTAMP_NOT_SET
if (isTimeoutEnabled) stateRow.getLong(timeoutTimestampIndex) else NO_TIMESTAMP
}
/** Set the timestamp in a state row */
......
......@@ -108,7 +108,10 @@ class IncrementalExecution(
case m: FlatMapGroupsWithStateExec =>
val stateId =
OperatorStateId(checkpointLocation, operatorId.getAndIncrement(), currentBatchId)
m.copy(stateId = Some(stateId), batchTimestampMs = offsetSeqMetadata.batchTimestampMs)
m.copy(
stateId = Some(stateId),
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))
}
}
......
......@@ -17,37 +17,45 @@
package org.apache.spark.sql.execution.streaming
import java.sql.Date
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.streaming.KeyedState
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
import org.apache.spark.sql.execution.streaming.KeyedStateImpl._
import org.apache.spark.sql.streaming.{KeyedState, KeyedStateTimeout}
import org.apache.spark.unsafe.types.CalendarInterval
/**
* Internal implementation of the [[KeyedState]] interface. Methods are not thread-safe.
* @param optionalValue Optional value of the state
* @param batchProcessingTimeMs Processing time of current batch, used to calculate timestamp
* for processing time timeouts
* @param isTimeoutEnabled Whether timeout is enabled. This will be used to check whether the user
* is allowed to configure timeouts.
* @param timeoutConf Type of timeout configured. Based on this, different operations will
* be supported.
* @param hasTimedOut Whether the key for which this state wrapped is being created is
* getting timed out or not.
*/
private[sql] class KeyedStateImpl[S](
optionalValue: Option[S],
batchProcessingTimeMs: Long,
isTimeoutEnabled: Boolean,
eventTimeWatermarkMs: Long,
timeoutConf: KeyedStateTimeout,
override val hasTimedOut: Boolean) extends KeyedState[S] {
import KeyedStateImpl._
// Constructor to create dummy state when using mapGroupsWithState in a batch query
def this(optionalValue: Option[S]) = this(
optionalValue, -1, isTimeoutEnabled = false, hasTimedOut = false)
optionalValue,
batchProcessingTimeMs = NO_TIMESTAMP,
eventTimeWatermarkMs = NO_TIMESTAMP,
timeoutConf = KeyedStateTimeout.NoTimeout,
hasTimedOut = false)
private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
private var defined: Boolean = optionalValue.isDefined
private var updated: Boolean = false // whether value has been updated (but not removed)
private var removed: Boolean = false // whether value has been removed
private var timeoutTimestamp: Long = TIMEOUT_TIMESTAMP_NOT_SET
private var timeoutTimestamp: Long = NO_TIMESTAMP
// ========= Public API =========
override def exists: Boolean = defined
......@@ -82,13 +90,14 @@ private[sql] class KeyedStateImpl[S](
defined = false
updated = false
removed = true
timeoutTimestamp = TIMEOUT_TIMESTAMP_NOT_SET
timeoutTimestamp = NO_TIMESTAMP
}
override def setTimeoutDuration(durationMs: Long): Unit = {
if (!isTimeoutEnabled) {
if (timeoutConf != ProcessingTimeTimeout) {
throw new UnsupportedOperationException(
"Cannot set timeout information without enabling timeout in map/flatMapGroupsWithState")
"Cannot set timeout duration without enabling processing time timeout in " +
"map/flatMapGroupsWithState")
}
if (!defined) {
throw new IllegalStateException(
......@@ -99,7 +108,7 @@ private[sql] class KeyedStateImpl[S](
if (durationMs <= 0) {
throw new IllegalArgumentException("Timeout duration must be positive")
}
if (!removed && batchProcessingTimeMs != NO_BATCH_PROCESSING_TIMESTAMP) {
if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
timeoutTimestamp = durationMs + batchProcessingTimeMs
} else {
// This is being called in a batch query, hence no processing timestamp.
......@@ -108,29 +117,55 @@ private[sql] class KeyedStateImpl[S](
}
override def setTimeoutDuration(duration: String): Unit = {
if (StringUtils.isBlank(duration)) {
throw new IllegalArgumentException(
"The window duration, slide duration and start time cannot be null or blank.")
}
val intervalString = if (duration.startsWith("interval")) {
duration
} else {
"interval " + duration
setTimeoutDuration(parseDuration(duration))
}
@throws[IllegalArgumentException]("if 'timestampMs' is not positive")
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@throws[UnsupportedOperationException](
"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
override def setTimeoutTimestamp(timestampMs: Long): Unit = {
checkTimeoutTimestampAllowed()
if (timestampMs <= 0) {
throw new IllegalArgumentException("Timeout timestamp must be positive")
}
val cal = CalendarInterval.fromString(intervalString)
if (cal == null) {
if (eventTimeWatermarkMs != NO_TIMESTAMP && timestampMs < eventTimeWatermarkMs) {
throw new IllegalArgumentException(
s"The provided duration ($duration) is not valid.")
s"Timeout timestamp ($timestampMs) cannot be earlier than the " +
s"current watermark ($eventTimeWatermarkMs)")
}
if (cal.milliseconds < 0 || cal.months < 0) {
throw new IllegalArgumentException("Timeout duration must be positive")
if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
timeoutTimestamp = timestampMs
} else {
// This is being called in a batch query, hence no processing timestamp.
// Just ignore any attempts to set timeout.
}
}
val delayMs = {
val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
cal.milliseconds + cal.months * millisPerMonth
}
setTimeoutDuration(delayMs)
@throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@throws[UnsupportedOperationException](
"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
checkTimeoutTimestampAllowed()
setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
}
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@throws[UnsupportedOperationException](
"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
override def setTimeoutTimestamp(timestamp: Date): Unit = {
checkTimeoutTimestampAllowed()
setTimeoutTimestamp(timestamp.getTime)
}
@throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@throws[UnsupportedOperationException](
"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
checkTimeoutTimestampAllowed()
setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
}
override def toString: String = {
......@@ -147,14 +182,46 @@ private[sql] class KeyedStateImpl[S](
/** Return timeout timestamp or `TIMEOUT_TIMESTAMP_NOT_SET` if not set */
def getTimeoutTimestamp: Long = timeoutTimestamp
private def parseDuration(duration: String): Long = {
if (StringUtils.isBlank(duration)) {
throw new IllegalArgumentException(
"Provided duration is null or blank.")
}
val intervalString = if (duration.startsWith("interval")) {
duration
} else {
"interval " + duration
}
val cal = CalendarInterval.fromString(intervalString)
if (cal == null) {
throw new IllegalArgumentException(
s"Provided duration ($duration) is not valid.")
}
if (cal.milliseconds < 0 || cal.months < 0) {
throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
}
val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
cal.milliseconds + cal.months * millisPerMonth
}
private def checkTimeoutTimestampAllowed(): Unit = {
if (timeoutConf != EventTimeTimeout) {
throw new UnsupportedOperationException(
"Cannot set timeout timestamp without enabling event time timeout in " +
"map/flatMapGroupsWithState")
}
if (!defined) {
throw new IllegalStateException(
"Cannot set timeout timestamp without any state value, " +
"state has either not been initialized, or has already been removed")
}
}
}
private[sql] object KeyedStateImpl {
// Value used in the state row to represent the lack of any timeout timestamp
val TIMEOUT_TIMESTAMP_NOT_SET = -1L
// Value to represent that no batch processing timestamp is passed to KeyedStateImpl. This is
// used in batch queries where there are no streaming batches and timeouts.
val NO_BATCH_PROCESSING_TIMESTAMP = -1L
// Value used represent the lack of valid timestamp as a long
val NO_TIMESTAMP = -1L
}
......@@ -80,7 +80,7 @@ trait WatermarkSupport extends UnaryExecNode {
/** Generate an expression that matches data older than the watermark */
lazy val watermarkExpression: Option[Expression] = {
val optionalWatermarkAttribute =
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey))
child.output.find(_.metadata.contains(EventTimeWatermark.delayKey))
optionalWatermarkAttribute.map { watermarkAttribute =>
// If we are evicting based on a window, use the end of the window. Otherwise just
......@@ -101,14 +101,12 @@ trait WatermarkSupport extends UnaryExecNode {
}
}
/** Generate a predicate based on keys that matches data older than the watermark */
/** Predicate based on keys that matches data older than the watermark */
lazy val watermarkPredicateForKeys: Option[Predicate] =
watermarkExpression.map(newPredicate(_, keyExpressions))
/**
* Generate a predicate based on the child output that matches data older than the watermark.
*/
lazy val watermarkPredicate: Option[Predicate] =
/** Predicate based on the child output that matches data older than the watermark. */
lazy val watermarkPredicateForData: Option[Predicate] =
watermarkExpression.map(newPredicate(_, child.output))
}
......@@ -218,7 +216,7 @@ case class StateStoreSaveExec(
new Iterator[InternalRow] {
// Filter late date using watermark if specified
private[this] val baseIterator = watermarkPredicate match {
private[this] val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
case None => iter
}
......@@ -285,7 +283,7 @@ case class StreamingDeduplicateExec(
val numTotalStateRows = longMetric("numTotalStateRows")
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
val baseIterator = watermarkPredicate match {
val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter(row => !predicate.eval(row))
case None => iter
}
......
......@@ -55,7 +55,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
* batch, nor with streaming Datasets.
* - All the data will be shuffled before applying the function.
* - If timeout is set, then the function will also be called with no values.
* See more details on KeyedStateTimeout` below.
* See more details on `KeyedStateTimeout` below.
*
* Important points to note about using `KeyedState`.
* - The value of the state cannot be null. So updating state with null will throw
......@@ -68,20 +68,38 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
*
* Important points to note about using `KeyedStateTimeout`.
* - The timeout type is a global param across all the keys (set as `timeout` param in
* `[map|flatMap]GroupsWithState`, but the exact timeout duration is configurable per key
* (by calling `setTimeout...()` in `KeyedState`).
* - When the timeout occurs for a key, the function is called with no values, and
* `[map|flatMap]GroupsWithState`, but the exact timeout duration/timestamp is configurable per
* key by calling `setTimeout...()` in `KeyedState`.
* - Timeouts can be either based on processing time (i.e.
* [[KeyedStateTimeout.ProcessingTimeTimeout]]) or event time (i.e.
* [[KeyedStateTimeout.EventTimeTimeout]]).
* - With `ProcessingTimeTimeout`, the timeout duration can be set by calling
* `KeyedState.setTimeoutDuration`. The timeout will occur when the clock has advanced by the set
* duration. Guarantees provided by this timeout with a duration of D ms are as follows:
* - Timeout will never be occur before the clock time has advanced by D ms
* - Timeout will occur eventually when there is a trigger in the query
* (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur.
* For example, the trigger interval of the query will affect when the timeout actually occurs.
* If there is no data in the stream (for any key) for a while, then their will not be
* any trigger and timeout function call will not occur until there is data.
* - Since the processing time timeout is based on the clock time, it is affected by the
* variations in the system clock (i.e. time zone changes, clock skew, etc.).
* - With `EventTimeTimeout`, the user also has to specify the the the event time watermark in
* the query using `Dataset.withWatermark()`. With this setting, data that is older than the
* watermark are filtered out. The timeout can be enabled for a key by setting a timestamp using
* `KeyedState.setTimeoutTimestamp()`, and the timeout would occur when the watermark advances
* beyond the set timestamp. You can control the timeout delay by two parameters - (i) watermark
* delay and an additional duration beyond the timestamp in the event (which is guaranteed to
* > watermark due to the filtering). Guarantees provided by this timeout are as follows:
* - Timeout will never be occur before watermark has exceeded the set timeout.
* - Similar to processing time timeouts, there is a no strict upper bound on the delay when
* the timeout actually occurs. The watermark can advance only when there is data in the
* stream, and the event time of the data has actually advanced.
* - When the timeout occurs for a key, the function is called for that key with no values, and
* `KeyedState.hasTimedOut()` set to true.
* - The timeout is reset for key every time the function is called on the key, that is,
* when the key has new data, or the key has timed out. So the user has to set the timeout
* duration every time the function is called, otherwise there will not be any timeout set.
* - Guarantees provided on processing-time-based timeout of key, when timeout duration is D ms:
* - Timeout will never be called before real clock time has advanced by D ms
* - Timeout will be called eventually when there is a trigger in the query
* (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur.
* For example, the trigger interval of the query will affect when the timeout is actually hit.
* If there is no data in the stream (for any key) for a while, then their will not be
* any trigger and timeout will not be hit until there is data.
*
* Scala example of using KeyedState in `mapGroupsWithState`:
* {{{
......@@ -194,7 +212,8 @@ trait KeyedState[S] extends LogicalKeyedState[S] {
/**
* Set the timeout duration in ms for this key.
* @note Timeouts must be enabled in `[map/flatmap]GroupsWithStates`.
*
* @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
*/
@throws[IllegalArgumentException]("if 'durationMs' is not positive")
@throws[IllegalStateException]("when state is either not initialized, or already removed")
......@@ -204,11 +223,63 @@ trait KeyedState[S] extends LogicalKeyedState[S] {
/**
* Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
* @note, Timeouts must be enabled in `[map/flatmap]GroupsWithStates`.
*
* @note, ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
*/
@throws[IllegalArgumentException]("if 'duration' is not a valid duration")
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@throws[UnsupportedOperationException](
"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
def setTimeoutDuration(duration: String): Unit
@throws[IllegalArgumentException]("if 'timestampMs' is not positive")
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@throws[UnsupportedOperationException](
"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
/**
* Set the timeout timestamp for this key as milliseconds in epoch time.
* This timestamp cannot be older than the current watermark.
*
* @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
*/
def setTimeoutTimestamp(timestampMs: Long): Unit
@throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@throws[UnsupportedOperationException](
"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
/**
* Set the timeout timestamp for this key as milliseconds in epoch time and an additional
* duration as a string (e.g. "1 hour", "2 days", etc.).
* The final timestamp (including the additional duration) cannot be older than the
* current watermark.
*
* @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
*/
def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@throws[UnsupportedOperationException](
"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
/**
* Set the timeout timestamp for this key as a java.sql.Date.
* This timestamp cannot be older than the current watermark.
*
* @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
*/
def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
@throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@throws[UnsupportedOperationException](
"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
/**
* Set the timeout timestamp for this key as a java.sql.Date and an additional
* duration as a string (e.g. "1 hour", "2 days", etc.).
* The final timestamp (including the additional duration) cannot be older than the
* current watermark.
*
* @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
*/
def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment