From 5bf6369220570d739ab31301fc9f8e22244a5da5 Mon Sep 17 00:00:00 2001
From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>
Date: Thu, 13 Jan 2011 19:22:07 -0800
Subject: [PATCH] Added a tracker strategy that selects random mappers for
 reducers. This can be used to measure tracker overhead.

---
 conf/java-opts                               |  2 +-
 src/scala/spark/ShuffleTrackerStrategy.scala | 42 ++++++++++++++++++++
 2 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/conf/java-opts b/conf/java-opts
index 20eb4511f4..e52b9e8681 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -1,7 +1,7 @@
 -Dspark.shuffle.class=spark.TrackedCustomBlockedInMemoryShuffle
 -Dspark.shuffle.masterHostAddress=127.0.0.1
 -Dspark.shuffle.masterTrackerPort=22222 
--Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy 
+-Dspark.shuffle.trackerStrategy=spark.SelectRandomShuffleTrackerStrategy 
 -Dspark.shuffle.maxRxConnections=40
 -Dspark.shuffle.maxTxConnections=120 
 -Dspark.shuffle.blockSize=4096
diff --git a/src/scala/spark/ShuffleTrackerStrategy.scala b/src/scala/spark/ShuffleTrackerStrategy.scala
index ae06575fe8..cfc5fcc1b0 100644
--- a/src/scala/spark/ShuffleTrackerStrategy.scala
+++ b/src/scala/spark/ShuffleTrackerStrategy.scala
@@ -1,5 +1,7 @@
 package spark
 
+import java.util.Random
+
 import scala.util.Sorting._
 
 /**
@@ -92,6 +94,46 @@ extends ShuffleTrackerStrategy with Logging {
   }
 }
 
+/**
+ * A simple ShuffleTrackerStrategy that randomly selects mapper for each reducer
+ */
+class SelectRandomShuffleTrackerStrategy
+extends ShuffleTrackerStrategy with Logging {
+  private var numMappers = -1
+  private var outputLocs: Array[SplitInfo] = null
+  
+  private var ranGen = new Random
+  
+  // The order of elements in the outputLocs (splitIndex) is used to pass 
+  // information back and forth between the tracker, mappers, and reducers
+  def initialize(outputLocs_ : Array[SplitInfo]): Unit = {
+    outputLocs = outputLocs_
+    numMappers = outputLocs.size
+  }
+  
+  def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized {
+    var splitIndex = -1
+    
+    do {
+      splitIndex = ranGen.nextInt(numMappers)
+    } while (reducerSplitInfo.hasSplitsBitVector.get(splitIndex))
+    
+    logInfo("%d".format(splitIndex))
+    
+    return splitIndex
+  }
+  
+  def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized {
+  }
+
+  def deleteReducerFrom(reducerSplitInfo: SplitInfo, 
+    receptionStat: ReceptionStats): Unit = synchronized {
+    // TODO: This assertion can legally fail when ShuffleClient times out while
+    // waiting for tracker response and decides to go to a random server
+    // assert(curConnectionsPerLoc(receptionStat.serverSplitIndex) >= 0)
+  }
+}
+
 /**
  * Shuffle tracker strategy that tries to balance the percentage of blocks 
  * remaining for each reducer
-- 
GitLab