Skip to content
Snippets Groups Projects
Commit 6c4b9f4b authored by Sean Owen's avatar Sean Owen
Browse files

[SPARK-16395][STREAMING] Fail if too many CheckpointWriteHandlers are queued...

[SPARK-16395][STREAMING] Fail if too many CheckpointWriteHandlers are queued up in the fixed thread pool

## What changes were proposed in this pull request?

Begin failing if checkpoint writes will likely keep up with storage's ability to write them, to fail fast instead of slowly filling memory

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #14152 from srowen/SPARK-16395.
parent 8310c074
No related branches found
No related tags found
No related merge requests found
......@@ -18,8 +18,8 @@
package org.apache.spark.streaming
import java.io._
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionException,
ThreadPoolExecutor, TimeUnit}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
......@@ -184,7 +184,14 @@ class CheckpointWriter(
hadoopConf: Configuration
) extends Logging {
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
// Single-thread executor which rejects executions when a large amount have queued up.
// This fails fast since this typically means the checkpoint store will never keep up, and
// will otherwise lead to filling memory with waiting payloads of byte[] to write.
val executor = new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue[Runnable](1000))
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
@volatile private[this] var fs: FileSystem = null
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment