Skip to content
Snippets Groups Projects
Commit f023aa2f authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers returns balanced results

This PR fixes the following cases for `ReceiverSchedulingPolicy`.

1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1).
Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested,  and try to register with ReceiverTracker. But the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected since r1 is starting exactly where `scheduleReceivers` suggested.

This case can be fixed by ignoring the information of the receiver that is rescheduling in `receiverTrackingInfoMap`.

2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will always return (host1, host2, host3). So it's possible that r2 will be scheduled to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that there are 3 receivers running on host1, while host2 and host3 are idle.

This issue can be fixed by returning only executors that have the minimum wight rather than returning at least 3 executors.

Author: zsxwing <zsxwing@gmail.com>

Closes #8340 from zsxwing/fix-receiver-scheduling.
parent d9c25dec
No related branches found
No related tags found
No related merge requests found
...@@ -22,6 +22,36 @@ import scala.collection.mutable ...@@ -22,6 +22,36 @@ import scala.collection.mutable
import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.receiver.Receiver
/**
* A class that tries to schedule receivers with evenly distributed. There are two phases for
* scheduling receivers.
*
* - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
* all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
* It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
* receiverTrackingInfoMap according to the results of `scheduleReceivers`.
* `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
* contains the scheduled locations. Then when a receiver is starting, it will send a register
* request and `ReceiverTracker.registerReceiver` will be called. In
* `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
* if the location of this receiver is one of the scheduled executors, if not, the register will
* be rejected.
* - The second phase is local scheduling when a receiver is restarting. There are two cases of
* receiver restarting:
* - If a receiver is restarting because it's rejected due to the real location and the scheduled
* executors mismatching, in other words, it fails to start in one of the locations that
* `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
* still alive in the list of scheduled executors, then use them to launch the receiver job.
* - If a receiver is restarting without a scheduled executors list, or the executors in the list
* are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
* not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
* it. Then when this receiver is registering, we can know this is a local scheduling, and
* `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
* location is matching.
*
* In conclusion, we should make a global schedule, try to achieve that exactly as long as possible,
* otherwise do local scheduling.
*/
private[streaming] class ReceiverSchedulingPolicy { private[streaming] class ReceiverSchedulingPolicy {
/** /**
...@@ -102,8 +132,7 @@ private[streaming] class ReceiverSchedulingPolicy { ...@@ -102,8 +132,7 @@ private[streaming] class ReceiverSchedulingPolicy {
/** /**
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can * Return a list of candidate executors to run the receiver. If the list is empty, the caller can
* run this receiver in arbitrary executor. The caller can use `preferredNumExecutors` to require * run this receiver in arbitrary executor.
* returning `preferredNumExecutors` executors if possible.
* *
* This method tries to balance executors' load. Here is the approach to schedule executors * This method tries to balance executors' load. Here is the approach to schedule executors
* for a receiver. * for a receiver.
...@@ -122,9 +151,8 @@ private[streaming] class ReceiverSchedulingPolicy { ...@@ -122,9 +151,8 @@ private[streaming] class ReceiverSchedulingPolicy {
* If a receiver is scheduled to an executor but has not yet run, it contributes * If a receiver is scheduled to an executor but has not yet run, it contributes
* `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li> * `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
* </ul> * </ul>
* At last, if there are more than `preferredNumExecutors` idle executors (weight = 0), * At last, if there are any idle executors (weight = 0), returns all idle executors.
* returns all idle executors. Otherwise, we only return `preferredNumExecutors` best options * Otherwise, returns the executors that have the minimum weight.
* according to the weights.
* </li> * </li>
* </ol> * </ol>
* *
...@@ -134,8 +162,7 @@ private[streaming] class ReceiverSchedulingPolicy { ...@@ -134,8 +162,7 @@ private[streaming] class ReceiverSchedulingPolicy {
receiverId: Int, receiverId: Int,
preferredLocation: Option[String], preferredLocation: Option[String],
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo], receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
executors: Seq[String], executors: Seq[String]): Seq[String] = {
preferredNumExecutors: Int = 3): Seq[String] = {
if (executors.isEmpty) { if (executors.isEmpty) {
return Seq.empty return Seq.empty
} }
...@@ -156,15 +183,18 @@ private[streaming] class ReceiverSchedulingPolicy { ...@@ -156,15 +183,18 @@ private[streaming] class ReceiverSchedulingPolicy {
} }
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq val idleExecutors = executors.toSet -- executorWeights.keys
if (idleExecutors.size >= preferredNumExecutors) { if (idleExecutors.nonEmpty) {
// If there are more than `preferredNumExecutors` idle executors, return all of them
scheduledExecutors ++= idleExecutors scheduledExecutors ++= idleExecutors
} else { } else {
// If there are less than `preferredNumExecutors` idle executors, return 3 best options // There is no idle executor. So select all executors that have the minimum weight.
scheduledExecutors ++= idleExecutors val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1) if (sortedExecutors.nonEmpty) {
scheduledExecutors ++= (idleExecutors ++ sortedExecutors).take(preferredNumExecutors) val minWeight = sortedExecutors(0)._2
scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
} else {
// This should not happen since "executors" is not empty
}
} }
scheduledExecutors.toSeq scheduledExecutors.toSeq
} }
......
...@@ -244,8 +244,21 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ...@@ -244,8 +244,21 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
} }
if (isTrackerStopping || isTrackerStopped) { if (isTrackerStopping || isTrackerStopped) {
false return false
} else if (!scheduleReceiver(streamId).contains(hostPort)) { }
val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
val accetableExecutors = if (scheduledExecutors.nonEmpty) {
// This receiver is registering and it's scheduled by
// ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it.
scheduledExecutors.get
} else {
// This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
// "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
scheduleReceiver(streamId)
}
if (!accetableExecutors.contains(hostPort)) {
// Refuse it since it's scheduled to a wrong executor // Refuse it since it's scheduled to a wrong executor
false false
} else { } else {
...@@ -426,12 +439,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ...@@ -426,12 +439,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
startReceiver(receiver, executors) startReceiver(receiver, executors)
} }
case RestartReceiver(receiver) => case RestartReceiver(receiver) =>
val scheduledExecutors = schedulingPolicy.rescheduleReceiver( // Old scheduled executors minus the ones that are not active any more
receiver.streamId, val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
receiver.preferredLocation, val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) {
receiverTrackingInfos, // Try global scheduling again
getExecutors) oldScheduledExecutors
updateReceiverScheduledExecutors(receiver.streamId, scheduledExecutors) } else {
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
// Clear "scheduledExecutors" to indicate we are going to do local scheduling
val newReceiverInfo = oldReceiverInfo.copy(
state = ReceiverState.INACTIVE, scheduledExecutors = None)
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
}
// Assume there is one receiver restarting at one time, so we don't need to update
// receiverTrackingInfos
startReceiver(receiver, scheduledExecutors) startReceiver(receiver, scheduledExecutors)
case c: CleanupOldBlocks => case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c)) receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
...@@ -464,6 +490,24 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ...@@ -464,6 +490,24 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
context.reply(true) context.reply(true)
} }
/**
* Return the stored scheduled executors that are still alive.
*/
private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = {
if (receiverTrackingInfos.contains(receiverId)) {
val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors
if (scheduledExecutors.nonEmpty) {
val executors = getExecutors.toSet
// Only return the alive executors
scheduledExecutors.get.filter(executors)
} else {
Nil
}
} else {
Nil
}
}
/** /**
* Start a receiver along with its scheduled executors * Start a receiver along with its scheduled executors
*/ */
...@@ -484,7 +528,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ...@@ -484,7 +528,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node // Function to start the receiver on the worker node
val startReceiverFunc = new StartReceiverFunc(checkpointDirOption, serializableHadoopConf) val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledExecutors to run the receiver in a Spark job // Create the RDD using the scheduledExecutors to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] = val receiverRDD: RDD[Receiver[_]] =
...@@ -541,31 +601,3 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ...@@ -541,31 +601,3 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
} }
} }
/**
* Function to start the receiver on the worker node. Use a class instead of closure to avoid
* the serialization issue.
*/
private[streaming] class StartReceiverFunc(
checkpointDirOption: Option[String],
serializableHadoopConf: SerializableConfiguration)
extends (Iterator[Receiver[_]] => Unit) with Serializable {
override def apply(iterator: Iterator[Receiver[_]]): Unit = {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
}
...@@ -39,7 +39,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { ...@@ -39,7 +39,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(scheduledExecutors.toSet === Set("host1", "host2")) assert(scheduledExecutors.toSet === Set("host1", "host2"))
} }
test("rescheduleReceiver: return all idle executors if more than 3 idle executors") { test("rescheduleReceiver: return all idle executors if there are any idle executors") {
val executors = Seq("host1", "host2", "host3", "host4", "host5") val executors = Seq("host1", "host2", "host3", "host4", "host5")
// host3 is idle // host3 is idle
val receiverTrackingInfoMap = Map( val receiverTrackingInfoMap = Map(
...@@ -49,16 +49,16 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { ...@@ -49,16 +49,16 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5")) assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
} }
test("rescheduleReceiver: return 3 best options if less than 3 idle executors") { test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") {
val executors = Seq("host1", "host2", "host3", "host4", "host5") val executors = Seq("host1", "host2", "host3", "host4", "host5")
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0 // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
// host4 and host5 are idle
val receiverTrackingInfoMap = Map( val receiverTrackingInfoMap = Map(
0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")), 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None), 1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None)) 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver( val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
3, None, receiverTrackingInfoMap, executors) 4, None, receiverTrackingInfoMap, executors)
assert(scheduledExecutors.toSet === Set("host2", "host4", "host5")) assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
} }
...@@ -127,4 +127,5 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { ...@@ -127,4 +127,5 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(executors.isEmpty) assert(executors.isEmpty)
} }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment