diff --git a/src/scala/spark/BitTorrentBroadcast.scala b/src/scala/spark/BitTorrentBroadcast.scala index 6945cb743f63178da4f8a68b0b3653f9bdc8d9bf..0ef0d92071117c8db71b6ce2459b34816ae3f6a8 100644 --- a/src/scala/spark/BitTorrentBroadcast.scala +++ b/src/scala/spark/BitTorrentBroadcast.scala @@ -559,7 +559,7 @@ extends Broadcast[T] with Logging { while (hasBlocks < totalBlocks && keepReceiving) { blockToAskFor = - pickBlockToRequest (newPeerToTalkTo.hasBlocksBitVector) + pickBlockRarestFirst (newPeerToTalkTo.hasBlocksBitVector) // No block to request if (blockToAskFor < 0) { @@ -635,11 +635,10 @@ extends Broadcast[T] with Logging { cleanUpConnections } - } + } // Right now it picks a block uniformly that this peer does not have - // TODO: Implement more intelligent block selection policies - private def pickBlockToRequest (txHasBlocksBitVector: BitSet): Int = { + private def pickBlockRandom (txHasBlocksBitVector: BitSet): Int = { var needBlocksBitVector: BitSet = null // Blocks already present @@ -658,7 +657,7 @@ extends Broadcast[T] with Logging { // Find blocks that are neither here nor in transit needBlocksBitVector.flip (0, needBlocksBitVector.size) - // Blocks that should be requested + // Blocks that should/can be requested needBlocksBitVector.and (txHasBlocksBitVector) if (needBlocksBitVector.cardinality == 0) { @@ -678,6 +677,60 @@ extends Broadcast[T] with Logging { } } + // Pick the block that seems to be the rarest across sources + private def pickBlockRarestFirst (txHasBlocksBitVector: BitSet): Int = { + var needBlocksBitVector: BitSet = null + + // Blocks already present + hasBlocksBitVector.synchronized { + needBlocksBitVector = hasBlocksBitVector.clone.asInstanceOf[BitSet] + } + + // Include blocks already in transmission ONLY IF + // BitTorrentBroadcast.EndGameFraction has NOT been achieved + if ((1.0 * hasBlocks / totalBlocks) < BitTorrentBroadcast.EndGameFraction) { + blocksInRequestBitVector.synchronized { + needBlocksBitVector.or (blocksInRequestBitVector) + } + } + + // Find blocks that are neither here nor in transit + needBlocksBitVector.flip (0, needBlocksBitVector.size) + + // Blocks that should/can be requested + needBlocksBitVector.and (txHasBlocksBitVector) + + if (needBlocksBitVector.cardinality == 0) { + return -1 + } else { + // Count the number of copies for each block across all sources + var numCopiesPerBlock = Array.tabulate [Int] (totalBlocks) (_ => 0) + + listOfSources.synchronized { + listOfSources.foreach { eachSource => + for (i <- 0 until totalBlocks) { + numCopiesPerBlock(i) += + ( if (eachSource.hasBlocksBitVector.get (i)) 1 else 0 ) + } + } + } + + // Find the block with the minimum copies that this peer does not have + var minVal = Integer.MAX_VALUE + var minIndex = -1 + for (i <- 0 until totalBlocks) { + if (needBlocksBitVector.get (i) && + numCopiesPerBlock(i) > 0 && + numCopiesPerBlock(i) < minVal) { + minVal = numCopiesPerBlock(i) + minIndex = i + } + } + + return minIndex + } + } + private def cleanUpConnections: Unit = { if (oisSource != null) { oisSource.close