Skip to content
Snippets Groups Projects
Commit e41cab04 authored by root's avatar root
Browse files

Avoid creating an extra buffer when saving a stream of values as DISK_ONLY

parent 33fb373e
No related branches found
No related tags found
No related merge requests found
package spark.storage
import java.io.{File, RandomAccessFile}
import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
import java.util.{LinkedHashMap, UUID}
......@@ -8,12 +8,14 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import spark.{Utils, Logging, Serializer, SizeEstimator}
/**
* Abstract class to store blocks
*/
abstract class BlockStore(blockManager: BlockManager) extends Logging {
abstract class BlockStore(val blockManager: BlockManager) extends Logging {
initLogging()
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
......@@ -217,25 +219,28 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
val file = createFile(blockId)
if (file != null) {
val channel = new RandomAccessFile(file, "rw").getChannel()
val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
buffer.put(bytes)
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
blockId, bytes.limit, (finishTime - startTime)))
} else {
logError("File not created for block " + blockId)
}
val channel = new RandomAccessFile(file, "rw").getChannel()
val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
buffer.put(bytes)
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
blockId, bytes.limit, (finishTime - startTime)))
}
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
: Either[Iterator[Any], ByteBuffer] = {
val bytes = dataSerialize(values)
logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes")
putBytes(blockId, bytes, level)
return Right(bytes)
: Either[Iterator[Any], ByteBuffer] = {
logDebug("Attempting to write values for block " + blockId)
val file = createFile(blockId)
val fileOut = new FastBufferedOutputStream(new FileOutputStream(file))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
objOut.writeAll(values)
objOut.close()
// Return a byte buffer for the contents of the file
val channel = new RandomAccessFile(file, "rw").getChannel()
Right(channel.map(MapMode.READ_WRITE, 0, channel.size()))
}
def getBytes(blockId: String): Option[ByteBuffer] = {
......@@ -267,8 +272,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
newFile.getParentFile.mkdirs()
return newFile
} else {
logError("File for block " + blockId + " already exists on disk, " + file)
return null
throw new Exception("File for block " + blockId + " already exists on disk, " + file)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment