Skip to content
Snippets Groups Projects
Commit e9b3afac authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite


## What changes were proposed in this pull request?

Fixed the following failures:

```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.0000790851666665 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout.
```

```
sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146)
Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null
	at java.util.ArrayList.addAll(ArrayList.java:577)
	at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257)
	at org.apache.kafka.clients.Metadata.update(Metadata.java:177)
	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605)
	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582)
	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
	at
...
```

## How was this patch tested?

Tested in #16048 by running many times.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16109 from zsxwing/fix-kafka-flaky-test.

(cherry picked from commit edc87e18)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent 76e1f165
No related branches found
No related tags found
No related merge requests found
...@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private( ...@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
var toFetchOffset = offset var toFetchOffset = offset
while (toFetchOffset != UNKNOWN_OFFSET) { while (toFetchOffset != UNKNOWN_OFFSET) {
try { try {
return fetchData(toFetchOffset, pollTimeoutMs) return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
} catch { } catch {
case e: OffsetOutOfRangeException => case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer to drop all cached // When there is some error thrown, it's better to use a new consumer to drop all cached
...@@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private( ...@@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private(
} }
/** /**
* Get the record at `offset`. * Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
* *
* @throws OffsetOutOfRangeException if `offset` is out of range * @throws OffsetOutOfRangeException if `offset` is out of range
* @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds. * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
*/ */
private def fetchData( private def fetchData(
offset: Long, offset: Long,
pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
// This is the first fetch, or the last pre-fetched data has been drained. // This is the first fetch, or the last pre-fetched data has been drained.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this. // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
...@@ -190,10 +194,31 @@ private[kafka010] case class CachedKafkaConsumer private( ...@@ -190,10 +194,31 @@ private[kafka010] case class CachedKafkaConsumer private(
} else { } else {
val record = fetchedData.next() val record = fetchedData.next()
nextOffsetInFetchedData = record.offset + 1 nextOffsetInFetchedData = record.offset + 1
// `seek` is always called before "poll". So "record.offset" must be same as "offset". // In general, Kafka uses the specified offset as the start point, and tries to fetch the next
assert(record.offset == offset, // available offset. Hence we need to handle offset mismatch.
s"The fetched data has a different offset: expected $offset but was ${record.offset}") if (record.offset > offset) {
record // This may happen when some records aged out but their offsets already got verified
if (failOnDataLoss) {
reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
// Never happen as "reportDataLoss" will throw an exception
null
} else {
if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
null
} else {
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
record
}
}
} else if (record.offset < offset) {
// This should not happen. If it does happen, then we probably misunderstand Kafka internal
// mechanism.
throw new IllegalStateException(
s"Tried to fetch $offset but the returned record offset was ${record.offset}")
} else {
record
}
} }
} }
......
...@@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource( ...@@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource(
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
private val offsetFetchAttemptIntervalMs = private val offsetFetchAttemptIntervalMs =
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
private val maxOffsetsPerTrigger = private val maxOffsetsPerTrigger =
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
......
...@@ -31,11 +31,12 @@ import org.scalatest.concurrent.Eventually._ ...@@ -31,11 +31,12 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkContext
import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
...@@ -811,6 +812,11 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared ...@@ -811,6 +812,11 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
override def createSparkSession(): TestSparkSession = {
// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
}
override def beforeAll(): Unit = { override def beforeAll(): Unit = {
super.beforeAll() super.beforeAll()
testUtils = new KafkaTestUtils { testUtils = new KafkaTestUtils {
...@@ -839,7 +845,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared ...@@ -839,7 +845,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
} }
} }
ignore("stress test for failOnDataLoss=false") { test("stress test for failOnDataLoss=false") {
val reader = spark val reader = spark
.readStream .readStream
.format("kafka") .format("kafka")
...@@ -848,6 +854,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared ...@@ -848,6 +854,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
.option("subscribePattern", "failOnDataLoss.*") .option("subscribePattern", "failOnDataLoss.*")
.option("startingOffsets", "earliest") .option("startingOffsets", "earliest")
.option("failOnDataLoss", "false") .option("failOnDataLoss", "false")
.option("fetchOffset.retryIntervalMs", "3000")
val kafka = reader.load() val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)] .as[(String, String)]
......
...@@ -184,7 +184,7 @@ class KafkaTestUtils extends Logging { ...@@ -184,7 +184,7 @@ class KafkaTestUtils extends Logging {
def deleteTopic(topic: String): Unit = { def deleteTopic(topic: String): Unit = {
val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
AdminUtils.deleteTopic(zkUtils, topic) AdminUtils.deleteTopic(zkUtils, topic)
verifyTopicDeletion(zkUtils, topic, partitions, List(this.server)) verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server))
} }
/** Add new paritions to a Kafka topic */ /** Add new paritions to a Kafka topic */
...@@ -286,36 +286,57 @@ class KafkaTestUtils extends Logging { ...@@ -286,36 +286,57 @@ class KafkaTestUtils extends Logging {
props props
} }
/** Verify topic is deleted in all places, e.g, brokers, zookeeper. */
private def verifyTopicDeletion( private def verifyTopicDeletion(
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]): Unit = {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
import ZkUtils._
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
assert(
!zkUtils.pathExists(getDeleteTopicPath(topic)),
s"${getDeleteTopicPath(topic)} still exists")
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty)),
s"topic $topic still exists in log mananger")
// ensure that topic is removed from all cleaner offsets
assert(servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), s"checkpoint for topic $topic still exists")
// ensure the topic is gone
assert(
!zkUtils.getAllTopics().contains(topic),
s"topic $topic still exists on zookeeper")
}
/** Verify topic is deleted. Retry to delete the topic if not. */
private def verifyTopicDeletionWithRetries(
zkUtils: ZkUtils, zkUtils: ZkUtils,
topic: String, topic: String,
numPartitions: Int, numPartitions: Int,
servers: Seq[KafkaServer]) { servers: Seq[KafkaServer]) {
import ZkUtils._ eventually(timeout(60.seconds), interval(200.millis)) {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) try {
def isDeleted(): Boolean = { verifyTopicDeletion(topic, numPartitions, servers)
// wait until admin path for delete topic is deleted, signaling completion of topic deletion } catch {
val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic)) case e: Throwable =>
val topicPath = !zkUtils.pathExists(getTopicPath(topic)) // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
// ensure that the topic-partition has been deleted from all brokers' replica managers // chance that a topic will be recreated after deletion due to the asynchronous update.
val replicaManager = servers.forall(server => topicAndPartitions.forall(tp => // Hence, delete the topic and retry.
server.replicaManager.getPartition(tp.topic, tp.partition) == None)) AdminUtils.deleteTopic(zkUtils, topic)
// ensure that logs from all replicas are deleted if delete topic is marked successful throw e
val logManager = servers.forall(server => topicAndPartitions.forall(tp => }
server.getLogManager().getLog(tp).isEmpty))
// ensure that topic is removed from all cleaner offsets
val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
})
// ensure the topic is gone
val deleted = !zkUtils.getAllTopics().contains(topic)
deletePath && topicPath && replicaManager && logManager && cleaner && deleted
}
eventually(timeout(60.seconds)) {
assert(isDeleted, s"$topic not deleted after timeout")
} }
} }
...@@ -331,7 +352,7 @@ class KafkaTestUtils extends Logging { ...@@ -331,7 +352,7 @@ class KafkaTestUtils extends Logging {
case _ => case _ =>
false false
} }
eventually(timeout(10.seconds)) { eventually(timeout(60.seconds)) {
assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout") assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
} }
} }
......
...@@ -48,14 +48,18 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach { ...@@ -48,14 +48,18 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
*/ */
protected implicit def sqlContext: SQLContext = _spark.sqlContext protected implicit def sqlContext: SQLContext = _spark.sqlContext
protected def createSparkSession: TestSparkSession = {
new TestSparkSession(
sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
}
/** /**
* Initialize the [[TestSparkSession]]. * Initialize the [[TestSparkSession]].
*/ */
protected override def beforeAll(): Unit = { protected override def beforeAll(): Unit = {
SparkSession.sqlListener.set(null) SparkSession.sqlListener.set(null)
if (_spark == null) { if (_spark == null) {
_spark = new TestSparkSession( _spark = createSparkSession
sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
} }
// Ensure we have initialized the context before calling parent code // Ensure we have initialized the context before calling parent code
super.beforeAll() super.beforeAll()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment