Skip to content
Snippets Groups Projects
Commit 0a5c24ae authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

- Default broadcast mechanism is set to DfsBroadcast

 - Configuration parameters are renamed to follow our convention
 - Master now automatically supplies its hostAddress instead of reading from config file
 - sendBroadcast has been removed from the Broadcast trait
parent 06dc4a51
No related branches found
No related tags found
No related merge requests found
......@@ -1047,7 +1047,7 @@ extends Logging {
private var initialized = false
private var isMaster_ = false
private var MasterHostAddress_ = "127.0.0.1"
private var MasterHostAddress_ = InetAddress.getLocalHost.getHostAddress
private var MasterTrackerPort_ : Int = 11111
private var BlockSize_ : Int = 512 * 1024
private var MaxRetryCount_ : Int = 2
......@@ -1079,40 +1079,38 @@ extends Logging {
def initialize (isMaster__ : Boolean): Unit = {
synchronized {
if (!initialized) {
MasterHostAddress_ =
System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1")
MasterTrackerPort_ =
System.getProperty ("spark.broadcast.MasterTrackerPort", "11111").toInt
System.getProperty ("spark.broadcast.masterTrackerPort", "11111").toInt
BlockSize_ =
System.getProperty ("spark.broadcast.BlockSize", "512").toInt * 1024
System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024
MaxRetryCount_ =
System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt
System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
TrackerSocketTimeout_ =
System.getProperty ("spark.broadcast.TrackerSocketTimeout", "50000").toInt
System.getProperty ("spark.broadcast.trackerSocketTimeout", "50000").toInt
ServerSocketTimeout_ =
System.getProperty ("spark.broadcast.ServerSocketTimeout", "10000").toInt
System.getProperty ("spark.broadcast.serverSocketTimeout", "10000").toInt
MinKnockInterval_ =
System.getProperty ("spark.broadcast.MinKnockInterval", "500").toInt
System.getProperty ("spark.broadcast.minKnockInterval", "500").toInt
MaxKnockInterval_ =
System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt
System.getProperty ("spark.broadcast.maxKnockInterval", "999").toInt
MaxPeersInGuideResponse_ =
System.getProperty ("spark.broadcast.MaxPeersInGuideResponse", "4").toInt
System.getProperty ("spark.broadcast.maxPeersInGuideResponse", "4").toInt
MaxRxPeers_ =
System.getProperty ("spark.broadcast.MaxRxPeers", "4").toInt
System.getProperty ("spark.broadcast.maxRxPeers", "4").toInt
MaxTxPeers_ =
System.getProperty ("spark.broadcast.MaxTxPeers", "4").toInt
System.getProperty ("spark.broadcast.maxTxPeers", "4").toInt
MaxChatTime_ =
System.getProperty ("spark.broadcast.MaxChatTime", "250").toInt
System.getProperty ("spark.broadcast.maxChatTime", "250").toInt
MaxChatBlocks_ =
System.getProperty ("spark.broadcast.MaxChatBlocks", "1024").toInt
System.getProperty ("spark.broadcast.maxChatBlocks", "1024").toInt
EndGameFraction_ =
System.getProperty ("spark.broadcast.EndGameFraction", "1.0").toDouble
System.getProperty ("spark.broadcast.endGameFraction", "1.0").toDouble
isMaster_ = isMaster__
......
......@@ -12,8 +12,6 @@ trait Broadcast[T] {
// We cannot have an abstract readObject here due to some weird issues with
// readObject having to be 'private' in sub-classes. Possibly a Scala bug!
def sendBroadcast: Unit
override def toString = "spark.Broadcast(" + uuid + ")"
}
......@@ -30,11 +28,12 @@ extends Logging {
// Called by SparkContext or Executor before using Broadcast
def initialize (isMaster: Boolean): Unit = synchronized {
if (!initialized) {
val broadcastFactoryClass = System.getProperty("spark.broadcast.Factory",
"spark.BitTorrentBroadcastFactory")
val broadcastFactoryClass = System.getProperty("spark.broadcast.factory",
"spark.DfsBroadcastFactory")
val booleanArgs = Array[AnyRef] (isMaster.asInstanceOf[AnyRef])
// broadcastFactory = Class.forName(broadcastFactoryClass).getConstructors()(0).newInstance(booleanArgs:_*).asInstanceOf[BroadcastFactory]
broadcastFactory = Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
broadcastFactory.initialize(isMaster)
......
......@@ -729,7 +729,7 @@ extends Logging {
private var initialized = false
private var isMaster_ = false
private var MasterHostAddress_ = "127.0.0.1"
private var MasterHostAddress_ = InetAddress.getLocalHost.getHostAddress
private var MasterTrackerPort_ : Int = 22222
private var BlockSize_ : Int = 512 * 1024
private var MaxRetryCount_ : Int = 2
......@@ -745,24 +745,22 @@ extends Logging {
def initialize (isMaster__ : Boolean): Unit = {
synchronized {
if (!initialized) {
MasterHostAddress_ =
System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1")
MasterTrackerPort_ =
System.getProperty ("spark.broadcast.MasterTrackerPort", "22222").toInt
System.getProperty ("spark.broadcast.masterTrackerPort", "22222").toInt
BlockSize_ =
System.getProperty ("spark.broadcast.BlockSize", "512").toInt * 1024
System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024
MaxRetryCount_ =
System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt
System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
TrackerSocketTimeout_ =
System.getProperty ("spark.broadcast.TrackerSocketTimeout", "50000").toInt
System.getProperty ("spark.broadcast.trackerSocketTimeout", "50000").toInt
ServerSocketTimeout_ =
System.getProperty ("spark.broadcast.ServerSocketTimeout", "10000").toInt
System.getProperty ("spark.broadcast.serverSocketTimeout", "10000").toInt
MinKnockInterval_ =
System.getProperty ("spark.broadcast.MinKnockInterval", "500").toInt
System.getProperty ("spark.broadcast.minKnockInterval", "500").toInt
MaxKnockInterval_ =
System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt
System.getProperty ("spark.broadcast.maxKnockInterval", "999").toInt
isMaster_ = isMaster__
......
......@@ -82,7 +82,7 @@ extends Logging {
conf.setInt("dfs.replication", rep)
fileSystem = FileSystem.get(new URI(dfs), conf)
}
workDir = System.getProperty("spark.dfs.workdir", "/tmp")
workDir = System.getProperty("spark.dfs.workDir", "/tmp")
compress = System.getProperty("spark.compress", "false").toBoolean
initialized = true
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment