Skip to content
Snippets Groups Projects
Commit 815de540 authored by YanTangZhai's avatar YanTangZhai Committed by Josh Rosen
Browse files

[SPARK-4946] [CORE] Using AkkaUtils.askWithReply in...

[SPARK-4946] [CORE] Using AkkaUtils.askWithReply in MapOutputTracker.askTracker to reduce the chance of the communicating problem

Using AkkaUtils.askWithReply in MapOutputTracker.askTracker to reduce the chance of the communicating problem

Author: YanTangZhai <hakeemzhai@tencent.com>
Author: yantangzhai <tyz0303@163.com>

Closes #3785 from YanTangZhai/SPARK-4946 and squashes the following commits:

9ca6541 [yantangzhai] [SPARK-4946] [CORE] Using AkkaUtils.askWithReply in MapOutputTracker.askTracker to reduce the chance of the communicating problem
e4c2c0a [YanTangZhai] Merge pull request #15 from apache/master
718afeb [YanTangZhai] Merge pull request #12 from apache/master
6e643f8 [YanTangZhai] Merge pull request #11 from apache/master
e249846 [YanTangZhai] Merge pull request #10 from apache/master
d26d982 [YanTangZhai] Merge pull request #9 from apache/master
76d4027 [YanTangZhai] Merge pull request #8 from apache/master
03b62b0 [YanTangZhai] Merge pull request #7 from apache/master
8a00106 [YanTangZhai] Merge pull request #6 from apache/master
cbcba66 [YanTangZhai] Merge pull request #3 from apache/master
cdef539 [YanTangZhai] Merge pull request #1 from apache/master
parent 4cef05e1
No related branches found
No related tags found
No related merge requests found
......@@ -76,6 +76,8 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {
private val timeout = AkkaUtils.askTimeout(conf)
private val retryAttempts = AkkaUtils.numRetries(conf)
private val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
/** Set to the MapOutputTrackerActor living on the driver. */
var trackerActor: ActorRef = _
......@@ -108,8 +110,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
*/
protected def askTracker(message: Any): Any = {
try {
val future = trackerActor.ask(message)(timeout)
Await.result(future, timeout)
AkkaUtils.askWithReply(message, trackerActor, retryAttempts, retryIntervalMs, timeout)
} catch {
case e: Exception =>
logError("Error communicating with MapOutputTracker", e)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment