Skip to content
Snippets Groups Projects
Commit 1231eb12 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #259 from mosharaf/bc-fix-dev

Synchronization bug fix in broadcast implementations
parents 4f1e0325 edc67bfb
No related branches found
No related tags found
No related merge requests found
......@@ -311,9 +311,11 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
while (hasBlocks.get < totalBlocks) {
var numThreadsToCreate =
math.min(listOfSources.size, MultiTracker.MaxChatSlots) -
var numThreadsToCreate = 0
listOfSources.synchronized {
numThreadsToCreate = math.min(listOfSources.size, MultiTracker.MaxChatSlots) -
threadPool.getActiveCount
}
while (hasBlocks.get < totalBlocks && numThreadsToCreate > 0) {
var peerToTalkTo = pickPeerToTalkToRandom
......@@ -726,7 +728,6 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
guidePortLock.synchronized { guidePortLock.notifyAll() }
try {
// Don't stop until there is a copy in HDFS
while (!stopBroadcast) {
var clientSocket: Socket = null
try {
......@@ -734,14 +735,17 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
clientSocket = serverSocket.accept()
} catch {
case e: Exception => {
logError("GuideMultipleRequests Timeout.")
// Stop broadcast if at least one worker has connected and
// everyone connected so far are done. Comparing with
// listOfSources.size - 1, because it includes the Guide itself
if (listOfSources.size > 1 &&
setOfCompletedSources.size == listOfSources.size - 1) {
stopBroadcast = true
listOfSources.synchronized {
setOfCompletedSources.synchronized {
if (listOfSources.size > 1 &&
setOfCompletedSources.size == listOfSources.size - 1) {
stopBroadcast = true
logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
}
}
}
}
}
......@@ -922,9 +926,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout)
clientSocket = serverSocket.accept()
} catch {
case e: Exception => {
logError("ServeMultipleRequests Timeout.")
}
case e: Exception => { }
}
if (clientSocket != null) {
logDebug("Serve: Accepted new client connection:" + clientSocket)
......
......@@ -228,7 +228,7 @@ extends Logging {
var oosTracker: ObjectOutputStream = null
var oisTracker: ObjectInputStream = null
var gInfo: SourceInfo = SourceInfo("", SourceInfo.TxOverGoToDefault)
var gInfo: SourceInfo = SourceInfo("", SourceInfo.TxNotStartedRetry)
var retriesLeft = MultiTracker.MaxRetryCount
do {
......
......@@ -27,9 +27,10 @@ extends Comparable[SourceInfo] with Logging {
* Helper Object of SourceInfo for its constants
*/
private[spark] object SourceInfo {
// Constants for special values of listenPort
// Broadcast has not started yet! Should never happen.
val TxNotStartedRetry = -1
val TxOverGoToDefault = 0
// Broadcast has already finished. Try default mechanism.
val TxOverGoToDefault = -3
// Other constants
val StopBroadcast = -2
val UnusedParam = 0
......
......@@ -292,15 +292,17 @@ extends Broadcast[T](id) with Logging with Serializable {
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
logError("GuideMultipleRequests Timeout.")
// Stop broadcast if at least one worker has connected and
// everyone connected so far are done.
// Comparing with listOfSources.size - 1, because the Guide itself
// is included
if (listOfSources.size > 1 &&
setOfCompletedSources.size == listOfSources.size - 1) {
stopBroadcast = true
// everyone connected so far are done. Comparing with
// listOfSources.size - 1, because it includes the Guide itself
listOfSources.synchronized {
setOfCompletedSources.synchronized {
if (listOfSources.size > 1 &&
setOfCompletedSources.size == listOfSources.size - 1) {
stopBroadcast = true
logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
}
}
}
}
}
......@@ -492,7 +494,7 @@ extends Broadcast[T](id) with Logging with Serializable {
serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => logError("ServeMultipleRequests Timeout.")
case e: Exception => { }
}
if (clientSocket != null) {
......
......@@ -17,9 +17,13 @@ object BroadcastTest {
for (i <- 0 until arr1.length)
arr1(i) = i
val barr1 = spark.broadcast(arr1)
spark.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size)
for (i <- 0 until 2) {
println("Iteration " + i)
println("===========")
val barr1 = spark.broadcast(arr1)
spark.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size)
}
}
System.exit(0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment