Skip to content
Snippets Groups Projects
Commit c45d6f12 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #501 from tdas/master

Fixed bug in BlockManager and added a testcase
parents 4d480ec5 c02e0649
No related branches found
No related tags found
No related merge requests found
...@@ -32,8 +32,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -32,8 +32,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
bytes.rewind()
if (level.deserialized) { if (level.deserialized) {
bytes.rewind()
val values = blockManager.dataDeserialize(blockId, bytes) val values = blockManager.dataDeserialize(blockId, bytes)
val elements = new ArrayBuffer[Any] val elements = new ArrayBuffer[Any]
elements ++= values elements ++= values
...@@ -58,7 +58,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -58,7 +58,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else { } else {
val bytes = blockManager.dataSerialize(blockId, values.iterator) val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.limit, false) tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes)) PutResult(bytes.limit(), Right(bytes.duplicate()))
} }
} }
......
package spark package spark
import network.ConnectionManagerId
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.ShouldMatchers import org.scalatest.matchers.ShouldMatchers
...@@ -13,7 +14,7 @@ import com.google.common.io.Files ...@@ -13,7 +14,7 @@ import com.google.common.io.Files
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import SparkContext._ import SparkContext._
import storage.StorageLevel import storage.{GetBlock, BlockManagerWorker, StorageLevel}
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext { class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
...@@ -140,9 +141,22 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter ...@@ -140,9 +141,22 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
test("caching in memory and disk, serialized, replicated") { test("caching in memory and disk, serialized, replicated") {
sc = new SparkContext(clusterUrl, "test") sc = new SparkContext(clusterUrl, "test")
val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2) val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
assert(data.count() === 1000) assert(data.count() === 1000)
assert(data.count() === 1000) assert(data.count() === 1000)
assert(data.count() === 1000) assert(data.count() === 1000)
// Get all the locations of the first partition and try to fetch the partitions
// from those locations.
val blockIds = data.partitions.indices.map(index => "rdd_%d_%d".format(data.id, index)).toArray
val blockId = blockIds(0)
val blockManager = SparkEnv.get.blockManager
blockManager.master.getLocations(blockId).foreach(id => {
val bytes = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(id.ip, id.port))
val deserialized = blockManager.dataDeserialize(blockId, bytes).asInstanceOf[Iterator[Int]].toList
assert(deserialized === (1 to 100).toList)
})
} }
test("compute without caching when no partitions fit in memory") { test("compute without caching when no partitions fit in memory") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment