diff --git a/src/scala/spark/LocalFileShuffle.scala b/src/scala/spark/LocalFileShuffle.scala index eb9905b77f7e691f0cb80d1d774eedd86c8fae6e..e3998b7774eeeed579bd4984466f78e22c954c66 100644 --- a/src/scala/spark/LocalFileShuffle.scala +++ b/src/scala/spark/LocalFileShuffle.scala @@ -146,8 +146,6 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { private var oosSource: ObjectOutputStream = null private var oisSource: ObjectInputStream = null - private var byteArray: Array[Byte] = null - private var receptionSucceeded = false override def run: Unit = { @@ -187,21 +185,9 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { // Receive the file if (requestedFileLen != -1) { - byteArray = new Array[Byte] (requestedFileLen) - var bytesRead = isSource.read (byteArray, 0, byteArray.length) - var alreadyRead = bytesRead - - while (alreadyRead < requestedFileLen) { - bytesRead = isSource.read(byteArray, alreadyRead, - (byteArray.length - alreadyRead)) - if(bytesRead > 0) { - alreadyRead = alreadyRead + bytesRead - } - } - - // Now add this to combiners - val inputStream = new ObjectInputStream ( - new ByteArrayInputStream(byteArray)) + // Add this to combiners + val inputStream = new ObjectInputStream (isSource) + try{ while (true) { val (k, c) = inputStream.readObject.asInstanceOf[(K, C)]