diff --git a/src/scala/spark/CustomParallelInMemoryShuffle.scala b/src/scala/spark/CustomParallelInMemoryShuffle.scala index ee3403e49ac7d910a2adf74314e5d6f8c2ec7599..044bcec8429af8522d26fa4aa52f11161868009d 100644 --- a/src/scala/spark/CustomParallelInMemoryShuffle.scala +++ b/src/scala/spark/CustomParallelInMemoryShuffle.scala @@ -70,7 +70,7 @@ extends Shuffle[K, V, C] with Logging { // Write buckets(i) to a byte array & put in splitsCache instead of file val baos = new ByteArrayOutputStream val oos = new ObjectOutputStream(baos) - oos.writeObject(buckets(i)) + buckets(i).foreach(pair => oos.writeObject(pair)) oos.close baos.close @@ -119,7 +119,7 @@ extends Shuffle[K, V, C] with Logging { CustomParallelInMemoryShuffle.MaxRxConnections) - threadPool.getActiveCount - while (hasSplits < totalSplits && numThreadsToCreate > 0) { + while (hasSplits < totalSplits && numThreadsToCreate > 0) { // Select a random split to pull val splitIndex = selectRandomSplit @@ -190,7 +190,6 @@ extends Shuffle[K, V, C] with Logging { try{ while (true) { -// logInfo("" + inputStream.readObject.isInstanceOf[(K, C)]) val (k, c) = inputStream.readObject.asInstanceOf[(K, C)] combiners(k) = combiners.get(k) match { case Some(oldC) => mergeCombiners(oldC, c)