Skip to content
Snippets Groups Projects
Commit 67582132 authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-11063] [STREAMING] Change preferredLocations of Receiver's RDD to hosts...

[SPARK-11063] [STREAMING] Change preferredLocations of Receiver's RDD to hosts rather than hostports

The format of RDD's preferredLocations must be hostname but the format of Streaming Receiver's scheduling executors is hostport. So it doesn't work.

This PR converts `schedulerExecutors` to `hosts` before creating Receiver's RDD.

Author: zsxwing <zsxwing@gmail.com>

Closes #9075 from zsxwing/SPARK-11063.
parent 59668179
No related branches found
No related tags found
No related merge requests found
...@@ -21,6 +21,7 @@ import scala.collection.Map ...@@ -21,6 +21,7 @@ import scala.collection.Map
import scala.collection.mutable import scala.collection.mutable
import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils
/** /**
* A class that tries to schedule receivers with evenly distributed. There are two phases for * A class that tries to schedule receivers with evenly distributed. There are two phases for
...@@ -79,7 +80,7 @@ private[streaming] class ReceiverSchedulingPolicy { ...@@ -79,7 +80,7 @@ private[streaming] class ReceiverSchedulingPolicy {
return receivers.map(_.streamId -> Seq.empty).toMap return receivers.map(_.streamId -> Seq.empty).toMap
} }
val hostToExecutors = executors.groupBy(_.split(":")(0)) val hostToExecutors = executors.groupBy(executor => Utils.parseHostPort(executor)._1)
val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String]) val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
val numReceiversOnExecutor = mutable.HashMap[String, Int]() val numReceiversOnExecutor = mutable.HashMap[String, Int]()
// Set the initial value to 0 // Set the initial value to 0
......
...@@ -551,7 +551,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ...@@ -551,7 +551,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
if (scheduledExecutors.isEmpty) { if (scheduledExecutors.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1) ssc.sc.makeRDD(Seq(receiver), 1)
} else { } else {
ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors)) val preferredLocations =
scheduledExecutors.map(hostPort => Utils.parseHostPort(hostPort)._1).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
} }
receiverRDD.setName(s"Receiver $receiverId") receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
......
...@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer ...@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._ import org.scalatest.time.SpanSugar._
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLocality}
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._ import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.dstream.ReceiverInputDStream
...@@ -80,6 +82,28 @@ class ReceiverTrackerSuite extends TestSuiteBase { ...@@ -80,6 +82,28 @@ class ReceiverTrackerSuite extends TestSuiteBase {
} }
} }
} }
test("SPARK-11063: TaskSetManager should use Receiver RDD's preferredLocations") {
// Use ManualClock to prevent from starting batches so that we can make sure the only task is
// for starting the Receiver
val _conf = conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
withStreamingContext(new StreamingContext(_conf, Milliseconds(100))) { ssc =>
@volatile var receiverTaskLocality: TaskLocality = null
ssc.sparkContext.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
receiverTaskLocality = taskStart.taskInfo.taskLocality
}
})
val input = ssc.receiverStream(new TestReceiver)
val output = new TestOutputStream(input)
output.register()
ssc.start()
eventually(timeout(10 seconds), interval(10 millis)) {
// If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL
assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
}
}
}
} }
/** An input DStream with for testing rate controlling */ /** An input DStream with for testing rate controlling */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment