diff --git a/conf/java-opts b/conf/java-opts
index e8b1255f826c74000be3bb2625100984640b269c..20a2ade45cc30b0cb2833f4683e919734dfcae07 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -1 +1 @@
--Dspark.broadcast.MasterHostAddress=127.0.0.1 -Dspark.broadcast.MasterTrackerPort=22222 -Dspark.broadcast.BlockSize=256 -Dspark.broadcast.MaxRetryCount=2 -Dspark.broadcast.TrackerSocketTimeout=50000 -Dspark.broadcast.ServerSocketTimeout=10000 -Dspark.broadcast.DualMode=false
+-Dspark.broadcast.MasterHostAddress=127.0.0.1 -Dspark.broadcast.MasterTrackerPort=22222 -Dspark.broadcast.BlockSize=256 -Dspark.broadcast.MaxRetryCount=2 -Dspark.broadcast.TrackerSocketTimeout=50000 -Dspark.broadcast.ServerSocketTimeout=10000
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 3f002d8aeccefe8e4864f5cd33e583ee175bb00d..08648d2ef4c612e35bd15627ce88eecbada7d7ef 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -10,11 +10,6 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
 
 import scala.collection.mutable.{Map, Set}
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
-
-import spark.compress.lzf.{LZFInputStream, LZFOutputStream}
-
 @serializable
 trait Broadcast {
   val uuid = UUID.randomUUID
@@ -63,12 +58,12 @@ extends Broadcast with Logging {
     sendBroadcast 
   }
 
-  def sendBroadcast () {    
+  def sendBroadcast (): Unit = {
     logInfo ("Local host address: " + hostAddress)
 
     // Store a persistent copy in HDFS    
     // TODO: Turned OFF for now
-    // val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
+    // val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid))
     // out.writeObject (value_)
     // out.close    
     // TODO: Fix this at some point  
@@ -104,12 +99,6 @@ extends Broadcast with Logging {
     val masterSource_0 = 
       SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 0) 
     pqOfSources.add (masterSource_0)
-    // Add one more time to have two replicas of any seeds in the PQ
-    if (BroadcastCS.DualMode) {
-      val masterSource_1 = 
-        SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 1) 
-      pqOfSources.add (masterSource_1)
-    }      
 
     // Register with the Tracker
     while (guidePort == -1) { 
@@ -120,7 +109,7 @@ extends Broadcast with Logging {
     BroadcastCS.registerValue (uuid, guidePort)  
   }
   
-  private def readObject (in: ObjectInputStream) {
+  private def readObject (in: ObjectInputStream): Unit = {
     in.defaultReadObject
     BroadcastCS.synchronized {
       val cachedVal = BroadcastCS.values.get (uuid)
@@ -145,7 +134,7 @@ extends Broadcast with Logging {
           value_ = unBlockifyObject[T]
           BroadcastCS.values.put (uuid, value_)
         }  else {
-          val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
+          val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
           value_ = fileIn.readObject.asInstanceOf[T]
           BroadcastCS.values.put(uuid, value_)
           fileIn.close
@@ -157,7 +146,7 @@ extends Broadcast with Logging {
     }
   }
   
-  private def initializeSlaveVariables = {
+  private def initializeSlaveVariables: Unit = {
     arrayOfBlocks = null
     totalBytes = -1
     totalBlocks = -1
@@ -334,7 +323,6 @@ extends Broadcast with Logging {
       if (!receptionSucceeded) { 
         sourceInfo.receptionFailed = true 
       }
-      sourceInfo.MBps = (sourceInfo.totalBytes.toDouble / 1048576) / time
 
       // Send back statistics to the Master
       oosMaster.writeObject (sourceInfo) 
@@ -419,7 +407,7 @@ extends Broadcast with Logging {
     // Keep track of sources that have completed reception
     private var setOfCompletedSources = Set[SourceInfo] ()
   
-    override def run = {
+    override def run: Unit = {
       var threadPool = BroadcastCS.newDaemonCachedThreadPool
       var serverSocket: ServerSocket = null
 
@@ -477,7 +465,7 @@ extends Broadcast with Logging {
       threadPool.shutdown      
     }
     
-    private def sendStopBroadcastNotifications = {
+    private def sendStopBroadcastNotifications: Unit = {
       pqOfSources.synchronized {
         var pqIter = pqOfSources.iterator        
         while (pqIter.hasNext) {
@@ -529,7 +517,7 @@ extends Broadcast with Logging {
       private var selectedSourceInfo: SourceInfo = null
       private var thisWorkerInfo:SourceInfo = null
       
-      override def run = {
+      override def run: Unit = {
         try {
           logInfo ("new GuideSingleRequest is running")
           // Connecting worker is sending in its hostAddress and listenPort it will 
@@ -568,20 +556,9 @@ extends Broadcast with Logging {
               setOfCompletedSources += thisWorkerInfo
                             
               selectedSourceInfo.currentLeechers -= 1
-              selectedSourceInfo.MBps = sourceInfo.MBps 
               
               // Put it back 
               pqOfSources.add (selectedSourceInfo)
-              
-              // Update global source speed statistics
-              BroadcastCS.setSourceSpeed (
-                sourceInfo.hostAddress, sourceInfo.MBps)
-
-              // No need to find and update thisWorkerInfo, but add its replica
-              if (BroadcastCS.DualMode) {
-                pqOfSources.add (SourceInfo (thisWorkerInfo.hostAddress, 
-                  thisWorkerInfo.listenPort, totalBlocks, totalBytes, 1))
-              }              
             }                        
           }      
         } catch {
@@ -635,7 +612,7 @@ extends Broadcast with Logging {
 
   class ServeMultipleRequests
   extends Thread with Logging {
-    override def run = {
+    override def run: Unit = {
       var threadPool = BroadcastCS.newDaemonCachedThreadPool
       var serverSocket: ServerSocket = null
 
@@ -688,7 +665,7 @@ extends Broadcast with Logging {
       private var sendFrom = 0
       private var sendUntil = totalBlocks
       
-      override def run  = {
+      override def run: Unit = {
         try {
           logInfo ("new ServeSingleRequest is running")
           
@@ -719,7 +696,7 @@ extends Broadcast with Logging {
         }
       }
 
-      private def sendObject = {
+      private def sendObject: Unit = {
         // Wait till receiving the SourceInfo from Master
         while (totalBlocks == -1) { 
           totalBlocksLock.synchronized {
@@ -748,49 +725,6 @@ extends Broadcast with Logging {
   }  
 }
 
-@serializable 
-class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) 
-extends Broadcast with Logging {
-  
-  def value = value_
-
-  BroadcastCH.synchronized { 
-    BroadcastCH.values.put(uuid, value_) 
-  }
-
-  if (!local) { 
-    sendBroadcast 
-  }
-
-  def sendBroadcast () {
-    val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
-    out.writeObject (value_)
-    out.close
-  }
-
-  // Called by Java when deserializing an object
-  private def readObject(in: ObjectInputStream) {
-    in.defaultReadObject
-    BroadcastCH.synchronized {
-      val cachedVal = BroadcastCH.values.get(uuid)
-      if (cachedVal != null) {
-        value_ = cachedVal.asInstanceOf[T]
-      } else {
-        logInfo( "Started reading Broadcasted variable " + uuid)
-        val start = System.nanoTime
-        
-        val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
-        value_ = fileIn.readObject.asInstanceOf[T]
-        BroadcastCH.values.put(uuid, value_)
-        fileIn.close
-        
-        val time = (System.nanoTime - start) / 1e9
-        logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
-      }
-    }
-  }
-}
-
 @serializable
 case class SourceInfo (val hostAddress: String, val listenPort: Int, 
   val totalBlocks: Int, val totalBytes: Int, val replicaID: Int)  
@@ -798,7 +732,6 @@ extends Comparable [SourceInfo] with Logging {
 
   var currentLeechers = 0
   var receptionFailed = false
-  var MBps: Double = BroadcastCS.MaxMBps
   
   var hasBlocks = 0
   
@@ -831,11 +764,11 @@ extends Logging {
 
   // Will be called by SparkContext or Executor before using Broadcast
   // Calls all other initializers here
-  def initialize (isMaster: Boolean) {
+  def initialize (isMaster: Boolean): Unit = {
     synchronized {
       if (!initialized) {
-        // Initialization for CentralizedHDFSBroadcast
-        BroadcastCH.initialize 
+        // Initialization for DfsBroadcast
+        DfsBroadcast.initialize 
         // Initialization for ChainedStreamingBroadcast
         BroadcastCS.initialize (isMaster)
         
@@ -851,8 +784,6 @@ extends Logging {
 
   var valueToGuidePortMap = Map[UUID, Int] ()
   
-  var sourceToSpeedMap = Map[String, Double] ()
-
   // Random number generator
   var ranGen = new Random
 
@@ -867,19 +798,12 @@ extends Logging {
   private var TrackerSocketTimeout_ : Int = 50000
   private var ServerSocketTimeout_ : Int = 10000
 
-  private var DualMode_ : Boolean = false
- 
   private var trackMV: TrackMultipleValues = null
 
-  // newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * curSpeed
-  private val ALPHA = 0.7
-  // 125.0 MBps = 1 Gbps link
-  private val MaxMBps_ = 125.0 
-
   private var MinKnockInterval_ = 500
   private var MaxKnockInterval_ = 999
 
-  def initialize (isMaster__ : Boolean) {
+  def initialize (isMaster__ : Boolean): Unit = {
     synchronized {
       if (!initialized) {
         MasterHostAddress_ = 
@@ -901,9 +825,6 @@ extends Logging {
         MaxKnockInterval_ =
           System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt
 
-        DualMode_ = 
-          System.getProperty ("spark.broadcast.DualMode", "false").toBoolean          
-
         isMaster_ = isMaster__        
                   
         if (isMaster) {
@@ -926,23 +847,19 @@ extends Logging {
   def TrackerSocketTimeout = TrackerSocketTimeout_
   def ServerSocketTimeout = ServerSocketTimeout_
 
-  def DualMode = DualMode_
-
   def isMaster = isMaster_ 
   
   def MinKnockInterval = MinKnockInterval_
   def MaxKnockInterval = MaxKnockInterval_
 
-  def MaxMBps = MaxMBps_
-  
-  def registerValue (uuid: UUID, guidePort: Int) = {
+  def registerValue (uuid: UUID, guidePort: Int): Unit = {
     valueToGuidePortMap.synchronized {    
       valueToGuidePortMap += (uuid -> guidePort)
       logInfo ("New value registered with the Tracker " + valueToGuidePortMap)             
     }
   }
   
-  def unregisterValue (uuid: UUID) = {
+  def unregisterValue (uuid: UUID): Unit = {
     valueToGuidePortMap.synchronized {
       valueToGuidePortMap (uuid) = SourceInfo.TxOverGoToHDFS
       logInfo ("Value unregistered from the Tracker " + valueToGuidePortMap)             
@@ -980,23 +897,9 @@ extends Logging {
     return threadPool
   }
 
-  def getSourceSpeed (hostAddress: String): Double = {
-    sourceToSpeedMap.synchronized {
-      sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps)
-    }
-  }
-  
-  def setSourceSpeed (hostAddress: String, MBps: Double) = {
-    sourceToSpeedMap.synchronized {
-      var oldSpeed = sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps)
-      var newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * MBps
-      sourceToSpeedMap.update (hostAddress, newSpeed)
-    }
-  }
-  
   class TrackMultipleValues
   extends Thread with Logging {
-    override def run = {
+    override def run: Unit = {
       var threadPool = BroadcastCS.newDaemonCachedThreadPool
       var serverSocket: ServerSocket = null
       
@@ -1018,7 +921,7 @@ extends Logging {
           if (clientSocket != null) {
             try {            
               threadPool.execute (new Thread {
-                override def run = {
+                override def run: Unit = {
                   val oos = new ObjectOutputStream (clientSocket.getOutputStream)
                   oos.flush
                   val ois = new ObjectInputStream (clientSocket.getInputStream)
@@ -1056,67 +959,3 @@ extends Logging {
     }
   }
 }
-
-private object BroadcastCH
-extends Logging {
-  val values = new MapMaker ().softValues ().makeMap[UUID, Any]
-
-  private var initialized = false
-
-  private var fileSystem: FileSystem = null
-  private var workDir: String = null
-  private var compress: Boolean = false
-  private var bufferSize: Int = 65536
-
-  def initialize () {
-    synchronized {
-      if (!initialized) {
-        bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-        val dfs = System.getProperty("spark.dfs", "file:///")
-        if (!dfs.startsWith("file://")) {
-          val conf = new Configuration()
-          conf.setInt("io.file.buffer.size", bufferSize)
-          val rep = System.getProperty("spark.dfs.replication", "3").toInt
-          conf.setInt("dfs.replication", rep)
-          fileSystem = FileSystem.get(new URI(dfs), conf)
-        }
-        workDir = System.getProperty("spark.dfs.workdir", "/tmp")
-        compress = System.getProperty("spark.compress", "false").toBoolean
-
-        initialized = true
-      }
-    }
-  }
-
-  private def getPath(uuid: UUID) = new Path(workDir + "/broadcast-" + uuid)
-
-  def openFileForReading(uuid: UUID): InputStream = {
-    val fileStream = if (fileSystem != null) {
-      fileSystem.open(getPath(uuid))
-    } else {
-      // Local filesystem
-      new FileInputStream(getPath(uuid).toString)
-    }
-    if (compress)
-      new LZFInputStream(fileStream) // LZF stream does its own buffering
-    else if (fileSystem == null)
-      new BufferedInputStream(fileStream, bufferSize)
-    else
-      fileStream // Hadoop streams do their own buffering
-  }
-
-  def openFileForWriting(uuid: UUID): OutputStream = {
-    val fileStream = if (fileSystem != null) {
-      fileSystem.create(getPath(uuid))
-    } else {
-      // Local filesystem
-      new FileOutputStream(getPath(uuid).toString)
-    }
-    if (compress)
-      new LZFOutputStream(fileStream) // LZF stream does its own buffering
-    else if (fileSystem == null)
-      new BufferedOutputStream(fileStream, bufferSize)
-    else
-      fileStream // Hadoop streams do their own buffering
-  }
-}
diff --git a/src/scala/spark/DfsBroadcast.scala b/src/scala/spark/DfsBroadcast.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5be5f98e8cad561355045b5963aca2b712b1f2be
--- /dev/null
+++ b/src/scala/spark/DfsBroadcast.scala
@@ -0,0 +1,127 @@
+package spark
+
+import com.google.common.collect.MapMaker
+
+import java.io._
+import java.net._
+import java.util.UUID
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
+
+import spark.compress.lzf.{LZFInputStream, LZFOutputStream}
+
+@serializable 
+class DfsBroadcast[T](@transient var value_ : T, local: Boolean) 
+extends Broadcast with Logging {
+  
+  def value = value_
+
+  DfsBroadcast.synchronized { 
+    DfsBroadcast.values.put(uuid, value_) 
+  }
+
+  if (!local) { 
+    sendBroadcast 
+  }
+
+  def sendBroadcast (): Unit = {
+    val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid))
+    out.writeObject (value_)
+    out.close
+  }
+
+  // Called by JVM when deserializing an object
+  private def readObject(in: ObjectInputStream): Unit = {
+    in.defaultReadObject
+    DfsBroadcast.synchronized {
+      val cachedVal = DfsBroadcast.values.get(uuid)
+      if (cachedVal != null) {
+        value_ = cachedVal.asInstanceOf[T]
+      } else {
+        logInfo( "Started reading Broadcasted variable " + uuid)
+        val start = System.nanoTime
+        
+        val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
+        value_ = fileIn.readObject.asInstanceOf[T]
+        DfsBroadcast.values.put(uuid, value_)
+        fileIn.close
+        
+        val time = (System.nanoTime - start) / 1e9
+        logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
+      }
+    }
+  }
+}
+
+private object DfsBroadcast
+extends Logging {
+  val values = new MapMaker ().softValues ().makeMap[UUID, Any]
+
+  private var initialized = false
+
+  private var fileSystem: FileSystem = null
+  private var workDir: String = null
+  private var compress: Boolean = false
+  private var bufferSize: Int = 65536
+
+  def initialize (): Unit = {
+    synchronized {
+      if (!initialized) {
+        bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+        val dfs = System.getProperty("spark.dfs", "file:///")
+        if (!dfs.startsWith("file://")) {
+          val conf = new Configuration()
+          conf.setInt("io.file.buffer.size", bufferSize)
+          val rep = System.getProperty("spark.dfs.replication", "3").toInt
+          conf.setInt("dfs.replication", rep)
+          fileSystem = FileSystem.get(new URI(dfs), conf)
+        }
+        workDir = System.getProperty("spark.dfs.workdir", "/tmp")
+        compress = System.getProperty("spark.compress", "false").toBoolean
+
+        initialized = true
+      }
+    }
+  }
+
+  private def getPath(uuid: UUID) = new Path(workDir + "/broadcast-" + uuid)
+
+  def openFileForReading(uuid: UUID): InputStream = {
+    val fileStream = if (fileSystem != null) {
+      fileSystem.open(getPath(uuid))
+    } else {
+      // Local filesystem
+      new FileInputStream(getPath(uuid).toString)
+    }
+    
+    if (compress) {
+      // LZF stream does its own buffering
+      new LZFInputStream(fileStream) 
+    } else if (fileSystem == null) {
+      new BufferedInputStream(fileStream, bufferSize)
+    } else { 
+      // Hadoop streams do their own buffering
+      fileStream 
+    }
+  }
+
+  def openFileForWriting(uuid: UUID): OutputStream = {
+    val fileStream = if (fileSystem != null) {
+      fileSystem.create(getPath(uuid))
+    } else {
+      // Local filesystem
+      new FileOutputStream(getPath(uuid).toString)
+    }
+    
+    if (compress) {
+      // LZF stream does its own buffering
+      new LZFOutputStream(fileStream) 
+    } else if (fileSystem == null) {
+      new BufferedOutputStream(fileStream, bufferSize)
+    } else {
+      // Hadoop streams do their own buffering
+      fileStream 
+    }
+  }
+}
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index 216ea4c0a926e8229ac1708ac94967e50091e7ad..75efd9d1fb952a46d3813c67a85090e3ce2c27b6 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -19,7 +19,7 @@ class SparkContext(master: String, frameworkName: String) extends Logging {
     new Accumulator(initialValue, param)
 
   // TODO: Keep around a weak hash map of values to Cached versions?
-  // def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local)
+  // def broadcast[T](value: T) = new DfsBroadcast(value, local)
   def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local)
 
   def textFile(path: String) = new HdfsTextFile(this, path)