diff --git a/src/scala/spark/BitTorrentBroadcast.scala b/src/scala/spark/BitTorrentBroadcast.scala index 60bed8511d731464af5df5b140101f50245e5b0d..e029108b09e946507d4b7eab3e7cd94a85bf20cc 100644 --- a/src/scala/spark/BitTorrentBroadcast.scala +++ b/src/scala/spark/BitTorrentBroadcast.scala @@ -896,11 +896,11 @@ extends Broadcast with Logging { class ServeMultipleRequests extends Thread with Logging { + // Server at most BitTorrentBroadcast.MaxRxPeers peers + var threadPool = + Broadcast.newDaemonFixedThreadPool(BitTorrentBroadcast.MaxRxPeers) + override def run: Unit = { - // Server at most BitTorrentBroadcast.MaxRxPeers peers - var threadPool = - Broadcast.newDaemonFixedThreadPool(BitTorrentBroadcast.MaxRxPeers) - var serverSocket = new ServerSocket (0) listenPort = serverSocket.getLocalPort @@ -975,8 +975,7 @@ extends Broadcast with Logging { var keepSending = true var numBlocksToSend = BitTorrentBroadcast.MaxChatBlocks - while (!stopBroadcast && keepSending && numBlocksToSend > 0 && - (curTime - startTime) < BitTorrentBroadcast.MaxChatTime) { + while (!stopBroadcast && keepSending && numBlocksToSend > 0) { // Receive which block to send val blockToSend = ois.readObject.asInstanceOf[Int] @@ -992,6 +991,11 @@ extends Broadcast with Logging { addToListOfSources (rxSourceInfo) curTime = System.currentTimeMillis + // Revoke sending only if there is anyone waiting in the queue + if (curTime - startTime >= BitTorrentBroadcast.MaxChatTime && + threadPool.getQueue.size > 0) { + keepSending = false + } } } catch { // If something went wrong, e.g., the worker at the other end died etc.