Skip to content
Snippets Groups Projects
Commit ae8c7d6c authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Made disk store use multiple directories, deleted ShuffleManager

parent 3d726799
No related branches found
No related tags found
No related merge requests found
package spark
import java.io._
import java.net.URL
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.{ArrayBuffer, HashMap}
import spark._
class ShuffleManager extends Logging {
private var nextShuffleId = new AtomicLong(0)
private var shuffleDir: File = null
private var server: HttpServer = null
private var serverUri: String = null
initialize()
private def initialize() {
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
val localDirRoot = System.getProperty("spark.local.dir", "/tmp")
var tries = 0
var foundLocalDir = false
var localDir: File = null
var localDirUuid: UUID = null
while (!foundLocalDir && tries < 10) {
tries += 1
try {
localDirUuid = UUID.randomUUID
localDir = new File(localDirRoot, "spark-local-" + localDirUuid)
if (!localDir.exists) {
localDir.mkdirs()
foundLocalDir = true
}
} catch {
case e: Exception =>
logWarning("Attempt " + tries + " to create local dir failed", e)
}
}
if (!foundLocalDir) {
logError("Failed 10 attempts to create local dir in " + localDirRoot)
System.exit(1)
}
shuffleDir = new File(localDir, "shuffle")
shuffleDir.mkdirs()
logInfo("Shuffle dir: " + shuffleDir)
// Add a shutdown hook to delete the local dir
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dir") {
override def run() {
Utils.deleteRecursively(localDir)
}
})
val extServerPort = System.getProperty(
"spark.localFileShuffle.external.server.port", "-1").toInt
if (extServerPort != -1) {
// We're using an external HTTP server; set URI relative to its root
var extServerPath = System.getProperty(
"spark.localFileShuffle.external.server.path", "")
if (extServerPath != "" && !extServerPath.endsWith("/")) {
extServerPath += "/"
}
serverUri = "http://%s:%d/%s/spark-local-%s".format(
Utils.localIpAddress, extServerPort, extServerPath, localDirUuid)
} else {
// Create our own server
server = new HttpServer(localDir)
server.start()
serverUri = server.uri
}
logInfo("Local URI: " + serverUri)
}
def stop() {
if (server != null) {
server.stop()
}
}
def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = {
val dir = new File(shuffleDir, shuffleId + "/" + inputId)
dir.mkdirs()
val file = new File(dir, "" + outputId)
return file
}
def getServerUri(): String = {
serverUri
}
def newShuffleId(): Long = {
nextShuffleId.getAndIncrement()
}
}
......@@ -10,6 +10,13 @@ import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
import spark.util.AkkaUtils
/**
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
class SparkEnv (
val actorSystem: ActorSystem,
val cache: Cache,
......@@ -18,7 +25,6 @@ class SparkEnv (
val cacheTracker: CacheTracker,
val mapOutputTracker: MapOutputTracker,
val shuffleFetcher: ShuffleFetcher,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
val connectionManager: ConnectionManager,
......@@ -27,7 +33,7 @@ class SparkEnv (
/** No-parameter constructor for unit tests. */
def this() = {
this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null, null, null)
this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null, null)
}
def stop() {
......@@ -35,7 +41,6 @@ class SparkEnv (
mapOutputTracker.stop()
cacheTracker.stop()
shuffleFetcher.stop()
shuffleManager.stop()
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
......@@ -88,9 +93,7 @@ object SparkEnv {
val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
val blockManager = new BlockManager(blockManagerMaster, serializer)
val connectionManager = blockManager.connectionManager
val shuffleManager = new ShuffleManager()
val connectionManager = blockManager.connectionManager
val broadcastManager = new BroadcastManager(isMaster)
......@@ -119,7 +122,6 @@ object SparkEnv {
cacheTracker,
mapOutputTracker,
shuffleFetcher,
shuffleManager,
broadcastManager,
blockManager,
connectionManager,
......
......@@ -355,8 +355,8 @@ private object Utils extends Logging {
* This is used, for example, to tell users where in their code each RDD got created.
*/
def getSparkCallSite: String = {
val trace = Thread.currentThread().getStackTrace().filter( el =>
(!el.getMethodName().contains("getStackTrace")))
val trace = Thread.currentThread.getStackTrace().filter( el =>
(!el.getMethodName.contains("getStackTrace")))
// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
......@@ -369,12 +369,12 @@ private object Utils extends Logging {
for (el <- trace) {
if (!finished) {
if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) {
lastSparkMethod = el.getMethodName()
if (el.getClassName.contains("spark") && !el.getClassName.startsWith("spark.examples")) {
lastSparkMethod = el.getMethodName
}
else {
firstUserLine = el.getLineNumber()
firstUserFile = el.getFileName()
firstUserLine = el.getLineNumber
firstUserFile = el.getFileName
finished = true
}
}
......
package spark.storage
import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
import java.util.{LinkedHashMap, UUID}
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import spark.{Utils, Logging, Serializer, SizeEstimator}
import spark.Logging
/**
* Abstract class to store blocks
*/
abstract class BlockStore(val blockManager: BlockManager) extends Logging {
initLogging()
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
/**
* Put in a block and return its content as either bytes or another Iterator. This is used
* to efficiently write the values to multiple locations (e.g. for replication).
*/
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
: Either[Iterator[Any], ByteBuffer]
: Either[Iterator[Any], ByteBuffer]
/**
* Return the size of a block.
......@@ -40,296 +34,3 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def clear() { }
}
/**
* Class to store blocks in memory
*/
class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {
case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
//private val blockDropper = Executors.newSingleThreadExecutor()
private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
private val blockDropper = new Thread("memory store - block dropper") {
override def run() {
try{
while (true) {
val blockId = blocksToDrop.take()
logDebug("Block " + blockId + " ready to be dropped")
blockManager.dropFromMemory(blockId)
}
} catch {
case ie: InterruptedException =>
logInfo("Shutting down block dropper")
}
}
}
blockDropper.start()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
def getSize(blockId: String): Long = memoryStore.synchronized { memoryStore.get(blockId).size }
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
if (level.deserialized) {
bytes.rewind()
val values = dataDeserialize(bytes)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
ensureFreeSpace(sizeEstimate)
val entry = new Entry(elements, sizeEstimate, true)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += sizeEstimate
logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
blockId, sizeEstimate, freeMemory))
} else {
val entry = new Entry(bytes, bytes.limit, false)
ensureFreeSpace(bytes.limit)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += bytes.limit
logInfo("Block %s stored as %d bytes to memory (free %d)".format(
blockId, bytes.limit, freeMemory))
}
}
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
: Either[Iterator[Any], ByteBuffer] = {
if (level.deserialized) {
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
ensureFreeSpace(sizeEstimate)
val entry = new Entry(elements, sizeEstimate, true)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += sizeEstimate
logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
blockId, sizeEstimate, freeMemory))
return Left(elements.iterator)
} else {
val bytes = dataSerialize(values)
ensureFreeSpace(bytes.limit)
val entry = new Entry(bytes, bytes.limit, false)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += bytes.limit
logInfo("Block %s stored as %d bytes to memory (free %d)".format(
blockId, bytes.limit, freeMemory))
return Right(bytes)
}
}
def getBytes(blockId: String): Option[ByteBuffer] = {
throw new UnsupportedOperationException("Not implemented")
}
def getValues(blockId: String): Option[Iterator[Any]] = {
val entry = memoryStore.synchronized { memoryStore.get(blockId) }
if (entry == null) {
return None
}
if (entry.deserialized) {
return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
} else {
return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate()))
}
}
def remove(blockId: String) {
memoryStore.synchronized {
val entry = memoryStore.get(blockId)
if (entry != null) {
memoryStore.remove(blockId)
currentMemory -= entry.size
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
} else {
logWarning("Block " + blockId + " could not be removed as it doesnt exist")
}
}
}
override def clear() {
memoryStore.synchronized {
memoryStore.clear()
}
//blockDropper.shutdown()
blockDropper.interrupt()
logInfo("MemoryStore cleared")
}
private def ensureFreeSpace(space: Long) {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
if (maxMemory - currentMemory < space) {
val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L
memoryStore.synchronized {
val iter = memoryStore.entrySet().iterator()
while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
val pair = iter.next()
val blockId = pair.getKey
val entry = pair.getValue()
if (!entry.dropPending) {
selectedBlocks += blockId
entry.dropPending = true
}
selectedMemory += pair.getValue.size
logInfo("Block " + blockId + " selected for dropping")
}
}
logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
blocksToDrop.size + " blocks pending")
var i = 0
while (i < selectedBlocks.size) {
blocksToDrop.add(selectedBlocks(i))
i += 1
}
selectedBlocks.clear()
}
}
}
/**
* Class to store blocks in disk
*/
class DiskStore(blockManager: BlockManager, rootDirs: String)
extends BlockStore(blockManager) {
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
val localDirs = createLocalDirs()
var lastLocalDirUsed = 0
addShutdownHook()
def getSize(blockId: String): Long = {
getFile(blockId).length
}
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
val file = createFile(blockId)
val channel = new RandomAccessFile(file, "rw").getChannel()
val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
buffer.put(bytes)
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
blockId, bytes.limit, (finishTime - startTime)))
}
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
: Either[Iterator[Any], ByteBuffer] = {
logDebug("Attempting to write values for block " + blockId)
val file = createFile(blockId)
val fileOut = blockManager.wrapForCompression(
new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
objOut.writeAll(values)
objOut.close()
// Return a byte buffer for the contents of the file
val channel = new RandomAccessFile(file, "rw").getChannel()
Right(channel.map(MapMode.READ_WRITE, 0, channel.size()))
}
def getBytes(blockId: String): Option[ByteBuffer] = {
val file = getFile(blockId)
val length = file.length().toInt
val channel = new RandomAccessFile(file, "r").getChannel()
Some(channel.map(MapMode.READ_WRITE, 0, length))
}
def getValues(blockId: String): Option[Iterator[Any]] = {
val file = getFile(blockId)
val length = file.length().toInt
val channel = new RandomAccessFile(file, "r").getChannel()
val bytes = channel.map(MapMode.READ_ONLY, 0, length)
val buffer = dataDeserialize(bytes)
channel.close()
return Some(buffer)
}
def remove(blockId: String) {
throw new UnsupportedOperationException("Not implemented")
}
private def createFile(blockId: String): File = {
val file = getFile(blockId)
if (file == null) {
lastLocalDirUsed = (lastLocalDirUsed + 1) % localDirs.size
val newFile = new File(localDirs(lastLocalDirUsed), blockId)
newFile.getParentFile.mkdirs()
return newFile
} else {
throw new Exception("File for block " + blockId + " already exists on disk, " + file)
}
}
private def getFile(blockId: String): File = {
logDebug("Getting file for block " + blockId)
// Search for the file in all the local directories, only one of them should have the file
val files = localDirs.map(localDir => new File(localDir, blockId)).filter(_.exists)
if (files.size > 1) {
throw new Exception("Multiple files for same block " + blockId + " exists: " +
files.map(_.toString).reduceLeft(_ + ", " + _))
return null
} else if (files.size == 0) {
return null
} else {
logDebug("Got file " + files(0) + " of size " + files(0).length + " bytes")
return files(0)
}
}
private def createLocalDirs(): Seq[File] = {
logDebug("Creating local directories at root dirs '" + rootDirs + "'")
rootDirs.split("[;,:]").map(rootDir => {
var foundLocalDir: Boolean = false
var localDir: File = null
var localDirUuid: UUID = null
var tries = 0
while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
tries += 1
try {
localDirUuid = UUID.randomUUID()
localDir = new File(rootDir, "spark-local-" + localDirUuid)
if (!localDir.exists) {
localDir.mkdirs()
foundLocalDir = true
}
} catch {
case e: Exception =>
logWarning("Attempt " + tries + " to create local dir failed", e)
}
}
if (!foundLocalDir) {
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
" attempts to create local dir in " + rootDir)
System.exit(1)
}
logDebug("Created local directory at " + localDir)
localDir
})
}
private def addShutdownHook() {
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run() {
logDebug("Shutdown hook called")
localDirs.foreach(localDir => Utils.deleteRecursively(localDir))
}
})
}
}
package spark.storage
import java.nio.ByteBuffer
import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel.MapMode
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import java.util.UUID
import spark.Utils
/**
* Stores BlockManager blocks on disk.
*/
class DiskStore(blockManager: BlockManager, rootDirs: String)
extends BlockStore(blockManager) {
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
val SUBDIRS_PER_LOCAL_DIR = 128
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
// directory, create multiple subdirectories that we will hash files into, in order to avoid
// having really large inodes at the top level.
val localDirs = createLocalDirs()
val subDirs = Array.fill(localDirs.length)(new Array[File](SUBDIRS_PER_LOCAL_DIR))
addShutdownHook()
override def getSize(blockId: String): Long = {
getFile(blockId).length
}
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
val file = createFile(blockId)
val channel = new RandomAccessFile(file, "rw").getChannel()
val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
buffer.put(bytes)
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
blockId, bytes.limit, (finishTime - startTime)))
}
override def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
: Either[Iterator[Any], ByteBuffer] = {
logDebug("Attempting to write values for block " + blockId)
val file = createFile(blockId)
val fileOut = blockManager.wrapForCompression(
new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
objOut.writeAll(values)
objOut.close()
// Return a byte buffer for the contents of the file
val channel = new RandomAccessFile(file, "rw").getChannel()
Right(channel.map(MapMode.READ_WRITE, 0, channel.size()))
}
override def getBytes(blockId: String): Option[ByteBuffer] = {
val file = getFile(blockId)
val length = file.length().toInt
val channel = new RandomAccessFile(file, "r").getChannel()
Some(channel.map(MapMode.READ_WRITE, 0, length))
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
val file = getFile(blockId)
val length = file.length().toInt
val channel = new RandomAccessFile(file, "r").getChannel()
val bytes = channel.map(MapMode.READ_ONLY, 0, length)
val buffer = dataDeserialize(bytes)
channel.close()
Some(buffer)
}
override def remove(blockId: String) {
throw new UnsupportedOperationException("Not implemented")
}
private def createFile(blockId: String): File = {
val file = getFile(blockId)
if (file.exists()) {
throw new Exception("File for block " + blockId + " already exists on disk: " + file)
}
file
}
private def getFile(blockId: String): File = {
logDebug("Getting file for block " + blockId)
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = math.abs(blockId.hashCode)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % SUBDIRS_PER_LOCAL_DIR
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
newDir.mkdir()
subDirs(dirId)(subDirId) = newDir
newDir
}
}
new File(subDir, blockId)
}
private def createLocalDirs(): Array[File] = {
logDebug("Creating local directories at root dirs '" + rootDirs + "'")
rootDirs.split(",").map(rootDir => {
var foundLocalDir: Boolean = false
var localDir: File = null
var localDirUuid: UUID = null
var tries = 0
while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
tries += 1
try {
localDirUuid = UUID.randomUUID()
localDir = new File(rootDir, "spark-local-" + localDirUuid)
if (!localDir.exists) {
localDir.mkdirs()
foundLocalDir = true
}
} catch {
case e: Exception =>
logWarning("Attempt " + tries + " to create local dir failed", e)
}
}
if (!foundLocalDir) {
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
" attempts to create local dir in " + rootDir)
System.exit(1)
}
logInfo("Created local directory at " + localDir)
localDir
})
}
private def addShutdownHook() {
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run() {
logDebug("Shutdown hook called")
localDirs.foreach(localDir => Utils.deleteRecursively(localDir))
}
})
}
}
package spark.storage
import java.util.LinkedHashMap
import java.util.concurrent.ArrayBlockingQueue
import spark.{SizeEstimator, Utils}
import java.nio.ByteBuffer
import collection.mutable.ArrayBuffer
/**
* Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
* serialized ByteBuffers.
*/
class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {
case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
//private val blockDropper = Executors.newSingleThreadExecutor()
private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
private val blockDropper = new Thread("memory store - block dropper") {
override def run() {
try{
while (true) {
val blockId = blocksToDrop.take()
logDebug("Block " + blockId + " ready to be dropped")
blockManager.dropFromMemory(blockId)
}
} catch {
case ie: InterruptedException =>
logInfo("Shutting down block dropper")
}
}
}
blockDropper.start()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: String): Long = {
memoryStore.synchronized {
memoryStore.get(blockId).size
}
}
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
if (level.deserialized) {
bytes.rewind()
val values = dataDeserialize(bytes)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
ensureFreeSpace(sizeEstimate)
val entry = new Entry(elements, sizeEstimate, true)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += sizeEstimate
logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
blockId, sizeEstimate, freeMemory))
} else {
val entry = new Entry(bytes, bytes.limit, false)
ensureFreeSpace(bytes.limit)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += bytes.limit
logInfo("Block %s stored as %d bytes to memory (free %d)".format(
blockId, bytes.limit, freeMemory))
}
}
override def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
: Either[Iterator[Any], ByteBuffer] = {
if (level.deserialized) {
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
ensureFreeSpace(sizeEstimate)
val entry = new Entry(elements, sizeEstimate, true)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += sizeEstimate
logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
blockId, sizeEstimate, freeMemory))
return Left(elements.iterator)
} else {
val bytes = dataSerialize(values)
ensureFreeSpace(bytes.limit)
val entry = new Entry(bytes, bytes.limit, false)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += bytes.limit
logInfo("Block %s stored as %d bytes to memory (free %d)".format(
blockId, bytes.limit, freeMemory))
return Right(bytes)
}
}
override def getBytes(blockId: String): Option[ByteBuffer] = {
throw new UnsupportedOperationException("Not implemented")
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
val entry = memoryStore.synchronized { memoryStore.get(blockId) }
if (entry == null) {
return None
}
if (entry.deserialized) {
return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
} else {
return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate()))
}
}
override def remove(blockId: String) {
memoryStore.synchronized {
val entry = memoryStore.get(blockId)
if (entry != null) {
memoryStore.remove(blockId)
currentMemory -= entry.size
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
} else {
logWarning("Block " + blockId + " could not be removed as it doesnt exist")
}
}
}
override def clear() {
memoryStore.synchronized {
memoryStore.clear()
}
//blockDropper.shutdown()
blockDropper.interrupt()
logInfo("MemoryStore cleared")
}
private def ensureFreeSpace(space: Long) {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
if (maxMemory - currentMemory < space) {
val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L
memoryStore.synchronized {
val iter = memoryStore.entrySet().iterator()
while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
val pair = iter.next()
val blockId = pair.getKey
val entry = pair.getValue
if (!entry.dropPending) {
selectedBlocks += blockId
entry.dropPending = true
}
selectedMemory += pair.getValue.size
logInfo("Block " + blockId + " selected for dropping")
}
}
logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
blocksToDrop.size + " blocks pending")
var i = 0
while (i < selectedBlocks.size) {
blocksToDrop.add(selectedBlocks(i))
i += 1
}
selectedBlocks.clear()
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment