Skip to content
Snippets Groups Projects
Commit 2c95e4e9 authored by jerryshao's avatar jerryshao Committed by Andrew Or
Browse files

[SPARK-14455][STREAMING] Fix NPE in allocatedExecutors when calling in receiver-less scenario

## What changes were proposed in this pull request?

When calling `ReceiverTracker#allocatedExecutors` in receiver-less scenario, NPE will be thrown, since this `ReceiverTracker` actually is not started and `endpoint` is not created.

This will be happened when playing streaming dynamic allocation with direct Kafka.

## How was this patch tested?

Local integrated test is done.

Author: jerryshao <sshao@hortonworks.com>

Closes #12236 from jerryshao/SPARK-14455.
parent 3fb09afd
No related branches found
No related tags found
No related merge requests found
...@@ -240,9 +240,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ...@@ -240,9 +240,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
* Get the executors allocated to each receiver. * Get the executors allocated to each receiver.
* @return a map containing receiver ids to optional executor ids. * @return a map containing receiver ids to optional executor ids.
*/ */
def allocatedExecutors(): Map[Int, Option[String]] = { def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { if (isTrackerStarted) {
_.runningExecutor.map { _.executorId } endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
_.runningExecutor.map {
_.executorId
}
}
} else {
Map.empty
} }
} }
......
...@@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLo ...@@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLo
import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._ import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.dstream.{ConstantInputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.receiver._
/** Testsuite for receiver scheduling */ /** Testsuite for receiver scheduling */
...@@ -102,6 +102,27 @@ class ReceiverTrackerSuite extends TestSuiteBase { ...@@ -102,6 +102,27 @@ class ReceiverTrackerSuite extends TestSuiteBase {
} }
} }
} }
test("get allocated executors") {
// Test get allocated executors when 1 receiver is registered
withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
val input = ssc.receiverStream(new TestReceiver)
val output = new TestOutputStream(input)
output.register()
ssc.start()
assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1)
}
// Test get allocated executors when there's no receiver registered
withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
val rdd = ssc.sc.parallelize(1 to 10)
val input = new ConstantInputDStream(ssc, rdd)
val output = new TestOutputStream(input)
output.register()
ssc.start()
assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty)
}
}
} }
/** An input DStream with for testing rate controlling */ /** An input DStream with for testing rate controlling */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment