From 1d9f0df0652f455145d2dfed43a9407df6de6c43 Mon Sep 17 00:00:00 2001
From: Shivaram Venkataraman <shivaram@eecs.berkeley.edu>
Date: Thu, 13 Jun 2013 14:46:25 -0700
Subject: [PATCH] Fix some comments and style

---
 core/src/main/java/spark/network/netty/FileClient.java    | 2 +-
 .../main/scala/spark/network/netty/ShuffleCopier.scala    | 8 ++++----
 .../main/scala/spark/storage/BlockFetcherIterator.scala   | 6 +-----
 core/src/main/scala/spark/storage/DiskStore.scala         | 3 +--
 core/src/test/scala/spark/ShuffleSuite.scala              | 3 +--
 5 files changed, 8 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/spark/network/netty/FileClient.java b/core/src/main/java/spark/network/netty/FileClient.java
index 517772202f..a4bb4bc701 100644
--- a/core/src/main/java/spark/network/netty/FileClient.java
+++ b/core/src/main/java/spark/network/netty/FileClient.java
@@ -30,7 +30,7 @@ class FileClient {
       .channel(OioSocketChannel.class)
       .option(ChannelOption.SO_KEEPALIVE, true)
       .option(ChannelOption.TCP_NODELAY, true)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) // Disable connect timeout
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
       .handler(new FileClientChannelInitializer(handler));
   }
 
diff --git a/core/src/main/scala/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/spark/network/netty/ShuffleCopier.scala
index afb2cdbb3a..8d5194a737 100644
--- a/core/src/main/scala/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/spark/network/netty/ShuffleCopier.scala
@@ -18,8 +18,9 @@ private[spark] class ShuffleCopier extends Logging {
       resultCollectCallback: (String, Long, ByteBuf) => Unit) {
 
     val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
-    val fc = new FileClient(handler,
-                            System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt)
+    val connectTimeout = System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt
+    val fc = new FileClient(handler, connectTimeout)
+
     try {
       fc.init()
       fc.connect(host, port)
@@ -29,8 +30,7 @@ private[spark] class ShuffleCopier extends Logging {
     } catch {
       // Handle any socket-related exceptions in FileClient
       case e: Exception => {
-        logError("Shuffle copy of block " + blockId + " from " + host + ":" + port + 
-          " failed", e)
+        logError("Shuffle copy of block " + blockId + " from " + host + ":" + port + " failed", e)
         handler.handleError(blockId)
       }
     }
diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
index bb78207c9f..bec876213e 100644
--- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
@@ -322,11 +322,7 @@ object BlockFetcherIterator {
     override def next(): (String, Option[Iterator[Any]]) = {
       resultsGotten += 1
       val result = results.take()
-      // if all the results has been retrieved, shutdown the copiers
-      // NO need to stop the copiers if we got all the blocks ?
-      // if (resultsGotten == _numBlocksToFetch && copiers != null) {
-      //   stopCopiers()
-      // }
+      // If all the results has been retrieved, copiers will exit automatically
       (result.blockId, if (result.failed) None else Some(result.deserialize()))
     }
   }
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 0af6e4a359..15ab840155 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -212,10 +212,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
     val file = getFile(blockId)
     if (!allowAppendExisting && file.exists()) {
       // NOTE(shivaram): Delete the file if it exists. This might happen if a ShuffleMap task
-      // was rescheduled on the same machine as the old task ?
+      // was rescheduled on the same machine as the old task.
       logWarning("File for block " + blockId + " already exists on disk: " + file + ". Deleting")
       file.delete()
-      // throw new Exception("File for block " + blockId + " already exists on disk: " + file)
     }
     file
   }
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 33b02fff80..1916885a73 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -376,8 +376,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
     val a = sc.parallelize(1 to 4, NUM_BLOCKS)
     val b = a.map(x => (x, x*2))
 
-    // NOTE: The default Java serializer doesn't create zero-sized blocks.
-    //       So, use Kryo
+    // NOTE: The default Java serializer should create zero-sized blocks
     val c = new ShuffledRDD(b, new HashPartitioner(10))
 
     val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
-- 
GitLab