diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala
index ea009f0f4f260f9c37b427ce4a56b6ac070cd70f..daabc0d566756c3917780f4da293b61f70f559fb 100644
--- a/core/src/main/scala/spark/BlockRDD.scala
+++ b/core/src/main/scala/spark/BlockRDD.scala
@@ -7,7 +7,8 @@ class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
 }
 
 
-class BlockRDD[T: ClassManifest](sc: SparkContext, blockIds: Array[String]) extends RDD[T](sc) {
+class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
+  extends RDD[T](sc) {
 
   @transient
   val splits_ = (0 until blockIds.size).map(i => {
diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala
index 003880c5e838bacb9a4e9388f1b2021e74701931..56e59adeb7152b7d4e6ffe6bb2fef7d8c1dc4132 100644
--- a/core/src/main/scala/spark/DaemonThreadFactory.scala
+++ b/core/src/main/scala/spark/DaemonThreadFactory.scala
@@ -6,9 +6,13 @@ import java.util.concurrent.ThreadFactory
  * A ThreadFactory that creates daemon threads
  */
 private object DaemonThreadFactory extends ThreadFactory {
-  override def newThread(r: Runnable): Thread = {
-    val t = new Thread(r)
-    t.setDaemon(true)
-    return t
+  override def newThread(r: Runnable): Thread = new DaemonThread(r)
+}
+
+private class DaemonThread(r: Runnable = null) extends Thread {
+  override def run() {
+    if (r != null) {
+      r.run()
+    }
   }
 }
\ No newline at end of file
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala
index f282a4023b126857c2af23dc6ffae2fab8d8db8f..0befca582d67c0ee38f497e501d7dfc15f4970af 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -42,7 +42,8 @@ class HadoopRDD[K, V](
     minSplits: Int)
   extends RDD[(K, V)](sc) {
   
-  val serializableConf = new SerializableWritable(conf)
+  // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
+  val confBroadcast = sc.broadcast(new SerializableWritable(conf))
   
   @transient
   val splits_ : Array[Split] = {
@@ -66,7 +67,7 @@ class HadoopRDD[K, V](
     val split = theSplit.asInstanceOf[HadoopSplit]
     var reader: RecordReader[K, V] = null
 
-    val conf = serializableConf.value
+    val conf = confBroadcast.value.value
     val fmt = createInputFormat(conf)
     reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
 
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index 65d0532bd58dddaea498fd4d9169eecfc4dea470..8a3f56507149f88e8f5506a6ba90f447e1be7e78 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -10,8 +10,10 @@ import scala.collection.mutable
 import com.esotericsoftware.kryo._
 import com.esotericsoftware.kryo.{Serializer => KSerializer}
 import com.esotericsoftware.kryo.serialize.ClassSerializer
+import com.esotericsoftware.kryo.serialize.SerializableSerializer
 import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
 
+import spark.broadcast._
 import spark.storage._
 
 /**
@@ -203,6 +205,10 @@ class KryoSerializer extends Serializer with Logging {
     kryo.register(classOf[Class[_]], new ClassSerializer(kryo))
     kryo.setRegistrationOptional(true)
 
+    // Allow sending SerializableWritable
+    kryo.register(classOf[SerializableWritable[_]], new SerializableSerializer())
+    kryo.register(classOf[HttpBroadcast[_]], new SerializableSerializer())
+
     // Register some commonly used Scala singleton objects. Because these
     // are singletons, we must return the exact same local object when we
     // deserialize rather than returning a clone as FieldSerializer would.
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index de23eb6f482202f0635b3872947b1145f40243bc..82c139134568b3982dc1c8da20f30d3d348ee004 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -1,5 +1,6 @@
 package spark
 
+import java.io.{DataInputStream, DataOutputStream, ByteArrayOutputStream, ByteArrayInputStream}
 import java.util.concurrent.ConcurrentHashMap
 
 import akka.actor._
@@ -10,6 +11,7 @@ import akka.util.Duration
 import akka.util.Timeout
 import akka.util.duration._
 
+import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
 
 import spark.storage.BlockManagerId
@@ -18,12 +20,11 @@ sealed trait MapOutputTrackerMessage
 case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage 
 case object StopMapOutputTracker extends MapOutputTrackerMessage
 
-class MapOutputTrackerActor(bmAddresses: ConcurrentHashMap[Int, Array[BlockManagerId]]) 
-extends Actor with Logging {
+class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
   def receive = {
     case GetMapOutputLocations(shuffleId: Int) =>
       logInfo("Asked to get map output locations for shuffle " + shuffleId)
-      sender ! bmAddresses.get(shuffleId)
+      sender ! tracker.getSerializedLocations(shuffleId)
 
     case StopMapOutputTracker =>
       logInfo("MapOutputTrackerActor stopped!")
@@ -39,15 +40,19 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
 
   val timeout = 10.seconds
 
-  private var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]]
+  var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]]
 
   // Incremented every time a fetch fails so that client nodes know to clear
   // their cache of map output locations if this happens.
   private var generation: Long = 0
   private var generationLock = new java.lang.Object
 
+  // Cache a serialized version of the output locations for each shuffle to send them out faster
+  var cacheGeneration = generation
+  val cachedSerializedLocs = new HashMap[Int, Array[Byte]]
+
   var trackerActor: ActorRef = if (isMaster) {
-    val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(bmAddresses)), name = actorName)
+    val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName)
     logInfo("Registered MapOutputTrackerActor actor")
     actor
   } else {
@@ -134,15 +139,16 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
       }
       // We won the race to fetch the output locs; do so
       logInfo("Doing the fetch; tracker actor = " + trackerActor)
-      val fetched = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[BlockManagerId]]
+      val fetchedBytes = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[Byte]]
+      val fetchedLocs = deserializeLocations(fetchedBytes)
       
       logInfo("Got the output locations")
-      bmAddresses.put(shuffleId, fetched)
+      bmAddresses.put(shuffleId, fetchedLocs)
       fetching.synchronized {
         fetching -= shuffleId
         fetching.notifyAll()
       }
-      return fetched
+      return fetchedLocs
     } else {
       return locs
     }
@@ -181,4 +187,70 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
       }
     }
   }
+
+  def getSerializedLocations(shuffleId: Int): Array[Byte] = {
+    var locs: Array[BlockManagerId] = null
+    var generationGotten: Long = -1
+    generationLock.synchronized {
+      if (generation > cacheGeneration) {
+        cachedSerializedLocs.clear()
+        cacheGeneration = generation
+      }
+      cachedSerializedLocs.get(shuffleId) match {
+        case Some(bytes) =>
+          return bytes
+        case None =>
+          locs = bmAddresses.get(shuffleId)
+          generationGotten = generation
+      }
+    }
+    // If we got here, we failed to find the serialized locations in the cache, so we pulled
+    // out a snapshot of the locations as "locs"; let's serialize and return that
+    val bytes = serializeLocations(locs)
+    // Add them into the table only if the generation hasn't changed while we were working
+    generationLock.synchronized {
+      if (generation == generationGotten) {
+        cachedSerializedLocs(shuffleId) = bytes
+      }
+    }
+    return bytes
+  }
+
+  // Serialize an array of map output locations into an efficient byte format so that we can send
+  // it to reduce tasks. We do this by grouping together the locations by block manager ID.
+  def serializeLocations(locs: Array[BlockManagerId]): Array[Byte] = {
+    val out = new ByteArrayOutputStream
+    val dataOut = new DataOutputStream(out)
+    dataOut.writeInt(locs.length)
+    val grouped = locs.zipWithIndex.groupBy(_._1)
+    dataOut.writeInt(grouped.size)
+    for ((id, pairs) <- grouped if id != null) {
+      dataOut.writeUTF(id.ip)
+      dataOut.writeInt(id.port)
+      dataOut.writeInt(pairs.length)
+      for ((_, blockIndex) <- pairs) {
+        dataOut.writeInt(blockIndex)
+      }
+    }
+    dataOut.close()
+    out.toByteArray
+  }
+
+  // Opposite of serializeLocations.
+  def deserializeLocations(bytes: Array[Byte]): Array[BlockManagerId] = {
+    val dataIn = new DataInputStream(new ByteArrayInputStream(bytes))
+    val length = dataIn.readInt()
+    val array = new Array[BlockManagerId](length)
+    val numGroups = dataIn.readInt()
+    for (i <- 0 until numGroups) {
+      val ip = dataIn.readUTF()
+      val port = dataIn.readInt()
+      val id = new BlockManagerId(ip, port)
+      val numBlocks = dataIn.readInt()
+      for (j <- 0 until numBlocks) {
+        array(dataIn.readInt()) = id
+      }
+    }
+    array
+  }
 }
diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala
index d024d38aa91e3309508b4d5267cb5e6fb8296ad3..14f708a3f8a6d89b698492621d19acd764fef77a 100644
--- a/core/src/main/scala/spark/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/NewHadoopRDD.scala
@@ -28,7 +28,9 @@ class NewHadoopRDD[K, V](
     @transient conf: Configuration)
   extends RDD[(K, V)](sc) {
   
-  private val serializableConf = new SerializableWritable(conf)
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
+  val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+  // private val serializableConf = new SerializableWritable(conf)
 
   private val jobtrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -41,7 +43,7 @@ class NewHadoopRDD[K, V](
   @transient
   private val splits_ : Array[Split] = {
     val inputFormat = inputFormatClass.newInstance
-    val jobContext = new JobContext(serializableConf.value, jobId)
+    val jobContext = new JobContext(conf, jobId)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Split](rawSplits.size)
     for (i <- 0 until rawSplits.size) {
@@ -54,9 +56,9 @@ class NewHadoopRDD[K, V](
 
   override def compute(theSplit: Split) = new Iterator[(K, V)] {
     val split = theSplit.asInstanceOf[NewHadoopSplit]
-    val conf = serializableConf.value
+    val conf = confBroadcast.value.value
     val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
-    val context = new TaskAttemptContext(serializableConf.value, attemptId)
+    val context = new TaskAttemptContext(conf, attemptId)
     val format = inputFormatClass.newInstance
     val reader = format.createRecordReader(split.serializableHadoopSplit.value, context)
     reader.initialize(split.serializableHadoopSplit.value, context)
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 3fe8e8a4bf007baa69299d66267145629e80de65..d28f3593fe62d85df86be50540c847941186baa8 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -94,7 +94,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
 
   def getStorageLevel = storageLevel
   
-  def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER): RDD[T] = {
+  def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER_2): RDD[T] = {
     if (!level.useDisk && level.replication < 2) {
       throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")")
     } 
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
index 5d21bb793f3dcefce2af736edeb602c47ff0c56f..555b3454ee527bf4fa97e5fb5f0dc1f2e11cd5e2 100644
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
@@ -8,6 +8,9 @@ import scala.io.Source
 import java.nio.ByteBuffer
 import java.net.InetAddress
 
+import akka.dispatch.Await
+import akka.util.duration._
+
 object ConnectionManagerTest extends Logging{
   def main(args: Array[String]) {
     if (args.length < 2) {
@@ -53,7 +56,7 @@ object ConnectionManagerTest extends Logging{
           logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
           connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
         })
-        val results = futures.map(f => f())
+        val results = futures.map(f => Await.result(f, 1.second))
         val finishTime = System.currentTimeMillis
         Thread.sleep(5000)
         
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index b9f0a0d6d0c7f294ea6eb955d8daa74990faa056..99984fb5574fba151388a9b7652222af4e8a45e2 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -31,7 +31,8 @@ object ShuffleMapTask {
         return old
       } else {
         val out = new ByteArrayOutputStream
-        val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
+        val ser = SparkEnv.get.closureSerializer.newInstance
+        val objOut = ser.serializeStream(new GZIPOutputStream(out))
         objOut.writeObject(rdd)
         objOut.writeObject(dep)
         objOut.close()
@@ -63,10 +64,8 @@ object ShuffleMapTask {
     synchronized {
       val loader = Thread.currentThread.getContextClassLoader
       val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-      val objIn = new ObjectInputStream(in) {
-        override def resolveClass(desc: ObjectStreamClass) =
-          Class.forName(desc.getName, false, loader)
-      }
+      val ser = SparkEnv.get.closureSerializer.newInstance
+      val objIn = ser.deserializeStream(in)
       val rdd = objIn.readObject().asInstanceOf[RDD[_]]
       val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
       return (rdd, dep)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 750231ac31e74c6f4e0a1080191d5f21451992a3..952c9766bfc29efdfbc931778ebf8d44452aa378 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -118,6 +118,7 @@ class ClusterScheduler(sc: SparkContext)
    */
   def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
     synchronized {
+      SparkEnv.set(sc.env)
       // Mark each slave as alive and remember its hostname
       for (o <- offers) {
         slaveIdToHost(o.slaveId) = o.hostname
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 5f98a396b4250cf5ced1eac9648b7f2bde69658d..e25a11e7c5f140ed8fd06c457d94eebd58ba705c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -243,6 +243,11 @@ class TaskSetManager(
 
   def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
     val info = taskInfos(tid)
+    if (info.failed) {
+      // We might get two task-lost messages for the same task in coarse-grained Mesos mode,
+      // or even from Mesos itself when acks get delayed.
+      return
+    }
     val index = info.index
     info.markSuccessful()
     if (!finished(index)) {
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 31784985dcbead81833970ed484cf9b009f04287..fdf007ffb2c26814394d935c5ef50429f374e119 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -80,6 +80,8 @@ class CoarseMesosSchedulerBackend(
         "property, the SPARK_HOME environment variable or the SparkContext constructor")
   }
 
+  val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
+
   var nextMesosTaskId = 0
 
   def newMesosTaskId(): Int = {
@@ -177,7 +179,7 @@ class CoarseMesosSchedulerBackend(
           val task = MesosTaskInfo.newBuilder()
             .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
             .setSlaveId(offer.getSlaveId)
-            .setCommand(createCommand(offer, cpusToUse))
+            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
             .setName("Task " + taskId)
             .addResources(createResource("cpus", cpusToUse))
             .addResources(createResource("mem", executorMemory))
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index cb7b0c8bc1d8b00aa36e0df478ef62ba64eb51c6..3a51f6bd96cbcfc898e9ecac87dd2bbfa072feaa 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -484,8 +484,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
     // Initiate the replication before storing it locally. This is faster as 
     // data is already serialized and ready for sending
     val replicationFuture = if (level.replication > 1) {
+      val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper
       Future {
-        replicate(blockId, bytes, level)
+        replicate(blockId, bufferView, level)
       }
     } else {
       null
@@ -537,21 +538,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
    * Replicate block to another node.
    */
 
+  var firstTime = true
+  var peers : Seq[BlockManagerId] = null
   private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
     val tLevel: StorageLevel =
       new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
-    var peers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+    if (firstTime) {
+      peers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+      firstTime = false;
+    } 
     for (peer: BlockManagerId <- peers) {
       val start = System.nanoTime
+      data.rewind()
       logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is "
-        + data.array().length + " Bytes. To node: " + peer)
+        + data.limit() + " Bytes. To node: " + peer)
       if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel),
         new ConnectionManagerId(peer.ip, peer.port))) {
         logError("Failed to call syncPutBlock to " + peer)
       }
       logDebug("Replicated BlockId " + blockId + " once used " +
         (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
-        data.array().length + " bytes.")
+        data.limit() + " bytes.")
     }
   }
 
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index 0eaa558f448b6e62a34432eb24cc7bacc561205c..0ad1ad056c1ec1e7ae4ccbcbb3ff97b52b5fa66a 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -71,7 +71,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
     logDebug("PutBlock " + id + " started from " + startTimeMs + " with data: " + bytes)
     blockManager.putBytes(id, bytes, level)
     logDebug("PutBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
-        + " with data size: " + bytes.array().length)
+        + " with data size: " + bytes.limit)
   }
 
   private def getBlock(id: String): ByteBuffer = {
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index b9833273e5b15e3a4c8fb7045a8838384d0dc1d8..5e2ccb199a361b5a9372bc4ff0a2860f843c5c6a 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -12,7 +12,7 @@ case class GetBlock(id: String)
 case class GotBlock(id: String, data: ByteBuffer)
 case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel) 
 
-class BlockMessage() extends Logging{
+class BlockMessage() {
   // Un-initialized: typ = 0
   // GetBlock: typ = 1
   // GotBlock: typ = 2
@@ -22,8 +22,6 @@ class BlockMessage() extends Logging{
   private var data: ByteBuffer = null
   private var level: StorageLevel = null
  
-  initLogging()
-
   def set(getBlock: GetBlock) {
     typ = BlockMessage.TYPE_GET_BLOCK
     id = getBlock.id
@@ -62,8 +60,6 @@ class BlockMessage() extends Logging{
     }
     id = idBuilder.toString()
     
-    logDebug("Set from buffer Result: " + typ + " " + id)
-    logDebug("Buffer position is " + buffer.position)
     if (typ == BlockMessage.TYPE_PUT_BLOCK) {
 
       val booleanInt = buffer.getInt()
@@ -77,23 +73,18 @@ class BlockMessage() extends Logging{
       }
       data.put(buffer)
       data.flip()
-      logDebug("Set from buffer Result 2: " + level + " " + data)
     } else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
 
       val dataLength = buffer.getInt()
-      logDebug("Data length is "+ dataLength)
-      logDebug("Buffer position is " + buffer.position)
       data = ByteBuffer.allocate(dataLength)
       if (dataLength != buffer.remaining) {
         throw new Exception("Error parsing buffer")
       }
       data.put(buffer)
       data.flip()
-      logDebug("Set from buffer Result 3: " + data)
     }
 
     val finishTime = System.currentTimeMillis
-    logDebug("Converted " + id + " from bytebuffer in " + (finishTime - startTime) / 1000.0  + " s")
   }
 
   def set(bufferMsg: BufferMessage) {
@@ -145,8 +136,6 @@ class BlockMessage() extends Logging{
       buffers += data
     }
     
-    logDebug("Start to log buffers.")
-    buffers.foreach((x: ByteBuffer) => logDebug("" + x))
     /*
     println()
     println("BlockMessage: ")
@@ -160,7 +149,6 @@ class BlockMessage() extends Logging{
     println()
     */
     val finishTime = System.currentTimeMillis
-    logDebug("Converted " + id + " to buffer message in " + (finishTime - startTime) / 1000.0  + " s")
     return Message.createBufferMessage(buffers)
   }
 
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index f66b5bc897cdd09a99806c76451fd1bc890b540f..09287faba04a3dad9571300baf2ecc455f93ad63 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -87,12 +87,12 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
         blockId, sizeEstimate, freeMemory))
     } else {
-      val entry = new Entry(bytes, bytes.array().length, false)
-      ensureFreeSpace(bytes.array.length)
+      val entry = new Entry(bytes, bytes.limit, false)
+      ensureFreeSpace(bytes.limit)
       memoryStore.synchronized { memoryStore.put(blockId, entry) }
-      currentMemory += bytes.array().length
+      currentMemory += bytes.limit
       logInfo("Block %s stored as %d bytes to memory (free %d)".format(
-        blockId, bytes.array().length, freeMemory))
+        blockId, bytes.limit, freeMemory))
     }
   }
 
@@ -111,12 +111,12 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       return Left(elements.iterator) 
     } else {
       val bytes = dataSerialize(values)
-      ensureFreeSpace(bytes.array().length)
-      val entry = new Entry(bytes, bytes.array().length, false)
+      ensureFreeSpace(bytes.limit)
+      val entry = new Entry(bytes, bytes.limit, false)
       memoryStore.synchronized { memoryStore.put(blockId, entry) } 
-      currentMemory += bytes.array().length
+      currentMemory += bytes.limit
       logInfo("Block %s stored as %d bytes to memory (free %d)".format(
-        blockId, bytes.array.length, freeMemory))
+        blockId, bytes.limit, freeMemory))
       return Right(bytes)
     }
   }
@@ -133,7 +133,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     if (entry.deserialized) {
       return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].toIterator)
     } else {
-      return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer])) 
+      return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate())) 
     }
   }
 
@@ -219,12 +219,12 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
     val file = createFile(blockId)
     if (file != null) {
       val channel = new RandomAccessFile(file, "rw").getChannel()
-      val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.array.length)
-      buffer.put(bytes.array)
+      val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
+      buffer.put(bytes)
       channel.close()
       val finishTime = System.currentTimeMillis
       logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
-        blockId, bytes.array.length, (finishTime - startTime)))
+        blockId, bytes.limit, (finishTime - startTime)))
     } else {
       logError("File not created for block " + blockId)
     }
@@ -233,7 +233,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
   def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
   : Either[Iterator[Any], ByteBuffer] = {
     val bytes = dataSerialize(values) 
-    logDebug("Converted block " + blockId + " to " + bytes.array.length + " bytes")
+    logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes")
     putBytes(blockId, bytes, level)
     return Right(bytes)
   }
@@ -242,9 +242,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
     val file = getFile(blockId) 
     val length = file.length().toInt
     val channel = new RandomAccessFile(file, "r").getChannel()
-    val bytes = ByteBuffer.allocate(length)
-    bytes.put(channel.map(MapMode.READ_WRITE, 0, length))
-    return Some(bytes)  
+    Some(channel.map(MapMode.READ_WRITE, 0, length))
   }
 
   def getValues(blockId: String): Option[Iterator[Any]] = {
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index 1d38ca13cc020de8419d69cfd070f0ab70c5ca9c..b168c8e869776b5af3903fe4f596536061e6a4a4 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -66,6 +66,7 @@ class StorageLevel(
 object StorageLevel {
   val NONE = new StorageLevel(false, false, false)
   val DISK_ONLY = new StorageLevel(true, false, false)
+  val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
   val MEMORY_ONLY = new StorageLevel(false, true, false)
   val MEMORY_ONLY_2 = new StorageLevel(false, true, false, 2)
   val MEMORY_ONLY_DESER = new StorageLevel(false, true, true)
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index 57d212e4cab04a19f0eb3edee654e4e89d7236bd..df4e23bfd6e8e1fb430d0e4796b004623aa2dd1b 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -31,6 +31,8 @@ object AkkaUtils {
       akka.remote.netty.hostname = "%s"
       akka.remote.netty.port = %d
       akka.remote.netty.connection-timeout = 1s
+      akka.remote.netty.execution-pool-size = 8
+      akka.actor.default-dispatcher.throughput = 30
       """.format(host, port))
 
     val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)