Skip to content
Snippets Groups Projects
Commit 4a45019f authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Address Matei's comments

parent da896115
No related branches found
No related tags found
No related merge requests found
...@@ -37,7 +37,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> { ...@@ -37,7 +37,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) { public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
BlockId blockId = BlockId.apply(blockIdString); BlockId blockId = BlockId.apply(blockIdString);
String path = pResolver.getAbsolutePath(blockId.asFilename()); String path = pResolver.getAbsolutePath(blockId.name());
// if getFilePath returns null, close the channel // if getFilePath returns null, close the channel
if (path == null) { if (path == null) {
//ctx.close(); //ctx.close();
......
...@@ -120,7 +120,7 @@ private object HttpBroadcast extends Logging { ...@@ -120,7 +120,7 @@ private object HttpBroadcast extends Logging {
} }
def write(id: Long, value: Any) { def write(id: Long, value: Any) {
val file = new File(broadcastDir, BroadcastBlockId(id).asFilename) val file = new File(broadcastDir, BroadcastBlockId(id).name)
val out: OutputStream = { val out: OutputStream = {
if (compress) { if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file)) compressionCodec.compressedOutputStream(new FileOutputStream(file))
...@@ -136,7 +136,7 @@ private object HttpBroadcast extends Logging { ...@@ -136,7 +136,7 @@ private object HttpBroadcast extends Logging {
} }
def read[T](id: Long): T = { def read[T](id: Long): T = {
val url = serverUri + "/" + BroadcastBlockId(id).asFilename val url = serverUri + "/" + BroadcastBlockId(id).name
val in = { val in = {
if (compress) { if (compress) {
compressionCodec.compressedInputStream(new URL(url).openStream()) compressionCodec.compressedInputStream(new URL(url).openStream())
......
...@@ -42,7 +42,7 @@ private[spark] class ShuffleCopier extends Logging { ...@@ -42,7 +42,7 @@ private[spark] class ShuffleCopier extends Logging {
try { try {
fc.init() fc.init()
fc.connect(host, port) fc.connect(host, port)
fc.sendRequest(blockId.asFilename) fc.sendRequest(blockId.name)
fc.waitForClose() fc.waitForClose()
fc.close() fc.close()
} catch { } catch {
......
...@@ -64,7 +64,7 @@ private[spark] object ShuffleSender { ...@@ -64,7 +64,7 @@ private[spark] object ShuffleSender {
val dirId = hash % localDirs.length val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
val file = new File(subDir, blockId.asFilename) val file = new File(subDir, blockId.name)
return file.getAbsolutePath return file.getAbsolutePath
} }
} }
......
...@@ -24,13 +24,10 @@ package org.apache.spark.storage ...@@ -24,13 +24,10 @@ package org.apache.spark.storage
* *
* If your BlockId should be serializable, be sure to add it to the BlockId.fromString() method. * If your BlockId should be serializable, be sure to add it to the BlockId.fromString() method.
*/ */
private[spark] abstract class BlockId { private[spark] sealed abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */ /** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String def name: String
/** Physical filename for this block. May not be valid for Blocks are not file-backed. */
def asFilename = name
// convenience methods // convenience methods
def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
def isRDD = isInstanceOf[RDDBlockId] def isRDD = isInstanceOf[RDDBlockId]
...@@ -73,21 +70,27 @@ private[spark] case class TestBlockId(id: String) extends BlockId { ...@@ -73,21 +70,27 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
private[spark] object BlockId { private[spark] object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r val RDD = "rdd_([0-9]+)_([0-9]+)".r
val Shuffle = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val Broadcast = "broadcast_([0-9]+)".r val BROADCAST = "broadcast_([0-9]+)".r
val TaskResult = "taskresult_([0-9]+)".r val TASKRESULT = "taskresult_([0-9]+)".r
val StreamInput = "input-([0-9]+)-([0-9]+)".r val STREAM = "input-([0-9]+)-([0-9]+)".r
val Test = "test_(.*)".r val TEST = "test_(.*)".r
/** Converts a BlockId "name" String back into a BlockId. */ /** Converts a BlockId "name" String back into a BlockId. */
def apply(id: String) = id match { def apply(id: String) = id match {
case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt) case RDD(rddId, splitIndex) =>
case Shuffle(shuffleId, mapId, reduceId) => RDDBlockId(rddId.toInt, splitIndex.toInt)
case SHUFFLE(shuffleId, mapId, reduceId) =>
ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case Broadcast(broadcastId) => BroadcastBlockId(broadcastId.toLong) case BROADCAST(broadcastId) =>
case TaskResult(taskId) => TaskResultBlockId(taskId.toLong) BroadcastBlockId(broadcastId.toLong)
case StreamInput(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong) case TASKRESULT(taskId) =>
case Test(value) => TestBlockId(value) TaskResultBlockId(taskId.toLong)
case _ => throw new IllegalStateException("Unrecognized BlockId: " + id) case STREAM(streamId, uniqueId) =>
StreamBlockId(streamId.toInt, uniqueId.toLong)
case TEST(value) =>
TestBlockId(value)
case _ =>
throw new IllegalStateException("Unrecognized BlockId: " + id)
} }
} }
...@@ -970,7 +970,7 @@ private[spark] class BlockManager( ...@@ -970,7 +970,7 @@ private[spark] class BlockManager(
case ShuffleBlockId(_, _, _) => compressShuffle case ShuffleBlockId(_, _, _) => compressShuffle
case BroadcastBlockId(_) => compressBroadcast case BroadcastBlockId(_) => compressBroadcast
case RDDBlockId(_, _) => compressRdds case RDDBlockId(_, _) => compressRdds
case _ => false // Won't happen in a real cluster, but it can in tests case _ => false
} }
/** /**
......
...@@ -258,7 +258,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) ...@@ -258,7 +258,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
} }
} }
new File(subDir, blockId.asFilename) new File(subDir, blockId.name)
} }
private def createLocalDirs(): Array[File] = { private def createLocalDirs(): Array[File] = {
......
...@@ -22,29 +22,20 @@ import org.scalatest.FunSuite ...@@ -22,29 +22,20 @@ import org.scalatest.FunSuite
class BlockIdSuite extends FunSuite { class BlockIdSuite extends FunSuite {
def assertSame(id1: BlockId, id2: BlockId) { def assertSame(id1: BlockId, id2: BlockId) {
assert(id1.name === id2.name) assert(id1.name === id2.name)
assert(id1.asFilename === id2.asFilename)
assert(id1.hashCode === id2.hashCode) assert(id1.hashCode === id2.hashCode)
assert(id1 === id2) assert(id1 === id2)
} }
def assertDifferent(id1: BlockId, id2: BlockId) { def assertDifferent(id1: BlockId, id2: BlockId) {
assert(id1.name != id2.name) assert(id1.name != id2.name)
assert(id1.asFilename != id2.asFilename)
assert(id1.hashCode != id2.hashCode) assert(id1.hashCode != id2.hashCode)
assert(id1 != id2) assert(id1 != id2)
} }
test("basic-functions") { test("test-bad-deserialization") {
case class MyBlockId(name: String) extends BlockId
val id = MyBlockId("a")
assertSame(id, MyBlockId("a"))
assertDifferent(id, MyBlockId("b"))
assert(id.asRDDId === None)
try { try {
// Try to deserialize an invalid block id. // Try to deserialize an invalid block id.
BlockId("a") BlockId("myblock")
fail() fail()
} catch { } catch {
case e: IllegalStateException => // OK case e: IllegalStateException => // OK
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment