From c484b735bb884a81eed3c0d4e4d48a09004feff6 Mon Sep 17 00:00:00 2001
From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>
Date: Wed, 22 Dec 2010 17:03:31 -0800
Subject: [PATCH] Bug squashed. CustomParallelInMemoryShuffle is rocking!

We were serializing one (the wrong) thing, trying to deserialize another (the right thing).
---
 src/scala/spark/CustomParallelInMemoryShuffle.scala | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/src/scala/spark/CustomParallelInMemoryShuffle.scala b/src/scala/spark/CustomParallelInMemoryShuffle.scala
index ee3403e49a..044bcec842 100644
--- a/src/scala/spark/CustomParallelInMemoryShuffle.scala
+++ b/src/scala/spark/CustomParallelInMemoryShuffle.scala
@@ -70,7 +70,7 @@ extends Shuffle[K, V, C] with Logging {
         // Write buckets(i) to a byte array & put in splitsCache instead of file
         val baos = new ByteArrayOutputStream
         val oos = new ObjectOutputStream(baos)
-        oos.writeObject(buckets(i))        
+        buckets(i).foreach(pair => oos.writeObject(pair))
         oos.close
         baos.close
         
@@ -119,7 +119,7 @@ extends Shuffle[K, V, C] with Logging {
           CustomParallelInMemoryShuffle.MaxRxConnections) - 
           threadPool.getActiveCount
       
-        while (hasSplits < totalSplits && numThreadsToCreate > 0) {        
+        while (hasSplits < totalSplits && numThreadsToCreate > 0) {
           // Select a random split to pull
           val splitIndex = selectRandomSplit
           
@@ -190,7 +190,6 @@ extends Shuffle[K, V, C] with Logging {
           
         try{
           while (true) {
-//            logInfo("" + inputStream.readObject.isInstanceOf[(K, C)])
             val (k, c) = inputStream.readObject.asInstanceOf[(K, C)]
             combiners(k) = combiners.get(k) match {
               case Some(oldC) => mergeCombiners(oldC, c)
-- 
GitLab