Skip to content
Snippets Groups Projects
Commit 68c52d80 authored by Reynold Xin's avatar Reynold Xin
Browse files

Moved BlockManager's IdGenerator into BlockManager object. Removed some

excessive debug messages.
parent 06f855c2
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,7 @@ import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
import spark.util.{ByteBufferInputStream, GenerationIdUtil, MetadataCleaner, TimeStampedHashMap}
import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
import sun.nio.ch.DirectBuffer
......@@ -91,7 +91,7 @@ class BlockManager(
val host = System.getProperty("spark.hostname", Utils.localHostName())
val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
name = "BlockManagerActor" + GenerationIdUtil.BLOCK_MANAGER.next)
name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
@volatile private var shuttingDown = false
......@@ -865,7 +865,7 @@ class BlockManager(
blockInfo.remove(blockId)
} else {
// The block has already been removed; do nothing.
logWarning("Block " + blockId + " does not exist.")
logWarning("Asked to remove block " + blockId + ", which does not exist")
}
}
......@@ -951,6 +951,9 @@ class BlockManager(
private[spark]
object BlockManager extends Logging {
val ID_GENERATOR = new IdGenerator
def getMaxMemoryFromSystemProperties: Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
......
......@@ -20,8 +20,8 @@ private[spark] class BlockManagerMaster(
masterPort: Int)
extends Logging {
val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "5").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "100").toInt
val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
......
......@@ -183,7 +183,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
......@@ -200,7 +199,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
}
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
......@@ -227,7 +225,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
return
}
......@@ -257,15 +254,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private def getLocations(blockId: String) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockId + " "
logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockInfo.get(blockId)._2)
logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
+ Utils.getUsedTimeMs(startTimeMs))
sender ! res.toSeq
} else {
logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs))
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
sender ! res
}
......@@ -274,25 +267,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
def getLocations(blockId: String): Seq[BlockManagerId] = {
val tmp = blockId
logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp)
if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockInfo.get(blockId)._2)
logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq)
return res.toSeq
} else {
logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp)
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
return res.toSeq
}
}
logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
for (blockId <- blockIds) {
res.append(getLocations(blockId))
}
logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq)
sender ! res.toSeq
}
......
package spark.util
import java.util.concurrent.atomic.AtomicInteger
private[spark]
object GenerationIdUtil {
val BLOCK_MANAGER = new IdGenerator
/**
* A util used to get a unique generation ID. This is a wrapper around
* Java's AtomicInteger.
*/
class IdGenerator {
private var id = new AtomicInteger
def next: Int = id.incrementAndGet
}
}
package spark.util
import java.util.concurrent.atomic.AtomicInteger
/**
* A util used to get a unique generation ID. This is a wrapper around Java's
* AtomicInteger. An example usage is in BlockManager, where each BlockManager
* instance would start an Akka actor and we use this utility to assign the Akka
* actors unique names.
*/
private[spark] class IdGenerator {
private var id = new AtomicInteger
def next: Int = id.incrementAndGet
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment