From 6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b Mon Sep 17 00:00:00 2001
From: Lianhui Wang <lianhuiwang09@gmail.com>
Date: Mon, 25 Apr 2016 12:33:32 -0700
Subject: [PATCH] [SPARK-14731][shuffle]Revert SPARK-12130 to make 2.0 shuffle
 service compatible with 1.x

## What changes were proposed in this pull request?
SPARK-12130 make 2.0 shuffle service incompatible with 1.x. So from discussion: [http://apache-spark-developers-list.1001551.n3.nabble.com/YARN-Shuffle-service-and-its-compatibility-td17222.html](url) we should maintain compatibility between Spark 1.x and Spark 2.x's shuffle service.
I put string comparison into executor's register at first avoid string comparison in getBlockData every time.

## How was this patch tested?
N/A

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes #12568 from lianhuiwang/SPARK-14731.
---
 .../shuffle/ExternalShuffleBlockResolver.java     | 15 +++++++++------
 .../spark/network/sasl/SaslIntegrationSuite.java  |  3 ++-
 .../ExternalShuffleBlockResolverSuite.java        | 11 ++++++-----
 .../shuffle/ExternalShuffleCleanupSuite.java      | 15 ++++++++-------
 .../shuffle/ExternalShuffleIntegrationSuite.java  | 11 ++++-------
 .../shuffle/ExternalShuffleSecuritySuite.java     |  3 ++-
 .../org/apache/spark/shuffle/ShuffleManager.scala |  3 ---
 .../spark/shuffle/sort/SortShuffleManager.scala   |  2 --
 .../org/apache/spark/storage/BlockManager.scala   |  2 +-
 .../network/yarn/YarnShuffleServiceSuite.scala    | 13 +++++++------
 10 files changed, 39 insertions(+), 39 deletions(-)

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 3071201266..54e870a9b5 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -76,6 +76,10 @@ public class ExternalShuffleBlockResolver {
   @VisibleForTesting
   final DB db;
 
+  private final List<String> knownManagers = Arrays.asList(
+    "org.apache.spark.shuffle.sort.SortShuffleManager",
+    "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");
+
   public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
       throws IOException {
     this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
@@ -149,6 +153,10 @@ public class ExternalShuffleBlockResolver {
       ExecutorShuffleInfo executorInfo) {
     AppExecId fullId = new AppExecId(appId, execId);
     logger.info("Registered executor {} with {}", fullId, executorInfo);
+    if (!knownManagers.contains(executorInfo.shuffleManager)) {
+      throw new UnsupportedOperationException(
+        "Unsupported shuffle manager of executor: " + executorInfo);
+    }
     try {
       if (db != null) {
         byte[] key = dbAppExecKey(fullId);
@@ -183,12 +191,7 @@ public class ExternalShuffleBlockResolver {
         String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
     }
 
-    if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) {
-      return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
-    } else {
-      throw new UnsupportedOperationException(
-        "Unsupported shuffle manager: " + executor.shuffleManager);
-    }
+    return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
   }
 
   /**
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 5bf9924185..6ba937dddb 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -220,7 +220,8 @@ public class SaslIntegrationSuite {
 
       // Register an executor so that the next steps work.
       ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
-        new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort");
+        new String[] { System.getProperty("java.io.tmpdir") }, 1,
+          "org.apache.spark.shuffle.sort.SortShuffleManager");
       RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo);
       client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS);
 
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index de4840a588..35d6346474 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -37,6 +37,7 @@ import static org.junit.Assert.*;
 public class ExternalShuffleBlockResolverSuite {
   private static final String sortBlock0 = "Hello!";
   private static final String sortBlock1 = "World!";
+  private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
 
   private static TestShuffleDataContext dataContext;
 
@@ -71,8 +72,8 @@ public class ExternalShuffleBlockResolverSuite {
     }
 
     // Invalid shuffle manager
-    resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
     try {
+      resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
       resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
       fail("Should have failed");
     } catch (UnsupportedOperationException e) {
@@ -81,7 +82,7 @@ public class ExternalShuffleBlockResolverSuite {
 
     // Nonexistent shuffle block
     resolver.registerExecutor("app0", "exec3",
-      dataContext.createExecutorInfo("sort"));
+      dataContext.createExecutorInfo(SORT_MANAGER));
     try {
       resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
       fail("Should have failed");
@@ -94,7 +95,7 @@ public class ExternalShuffleBlockResolverSuite {
   public void testSortShuffleBlocks() throws IOException {
     ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
     resolver.registerExecutor("app0", "exec0",
-      dataContext.createExecutorInfo("sort"));
+      dataContext.createExecutorInfo(SORT_MANAGER));
 
     InputStream block0Stream =
       resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
@@ -120,7 +121,7 @@ public class ExternalShuffleBlockResolverSuite {
     assertEquals(parsedAppId, appId);
 
     ExecutorShuffleInfo shuffleInfo =
-      new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "sort");
+      new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, SORT_MANAGER);
     String shuffleJson = mapper.writeValueAsString(shuffleInfo);
     ExecutorShuffleInfo parsedShuffleInfo =
       mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
@@ -131,7 +132,7 @@ public class ExternalShuffleBlockResolverSuite {
     String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
     assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
     String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
-      "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"sort\"}";
+      "\"subDirsPerLocalDir\": 7, \"shuffleManager\": " + "\"" + SORT_MANAGER + "\"}";
     assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
   }
 }
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index fa5cd1398a..bdd218db69 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -37,6 +37,7 @@ public class ExternalShuffleCleanupSuite {
   // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
   private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
   private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+  private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
 
   @Test
   public void noCleanupAndCleanup() throws IOException {
@@ -44,12 +45,12 @@ public class ExternalShuffleCleanupSuite {
 
     ExternalShuffleBlockResolver resolver =
       new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
-    resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+    resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
     resolver.applicationRemoved("app", false /* cleanup */);
 
     assertStillThere(dataContext);
 
-    resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
+    resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo(SORT_MANAGER));
     resolver.applicationRemoved("app", true /* cleanup */);
 
     assertCleanedUp(dataContext);
@@ -69,7 +70,7 @@ public class ExternalShuffleCleanupSuite {
     ExternalShuffleBlockResolver manager =
       new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
 
-    manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+    manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
     manager.applicationRemoved("app", true);
 
     assertTrue(cleanupCalled.get());
@@ -87,8 +88,8 @@ public class ExternalShuffleCleanupSuite {
     ExternalShuffleBlockResolver resolver =
       new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
 
-    resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
-    resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
+    resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
+    resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER));
     resolver.applicationRemoved("app", true);
 
     assertCleanedUp(dataContext0);
@@ -103,8 +104,8 @@ public class ExternalShuffleCleanupSuite {
     ExternalShuffleBlockResolver resolver =
       new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
 
-    resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
-    resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
+    resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
+    resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo(SORT_MANAGER));
 
     resolver.applicationRemoved("app-nonexistent", true);
     assertStillThere(dataContext0);
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 067c815c30..552b5366c5 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -48,8 +48,8 @@ import org.apache.spark.network.util.TransportConf;
 
 public class ExternalShuffleIntegrationSuite {
 
-  static String APP_ID = "app-id";
-  static String SORT_MANAGER = "sort";
+  private static final String APP_ID = "app-id";
+  private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
 
   // Executor 0 is sort-based
   static TestShuffleDataContext dataContext0;
@@ -184,12 +184,9 @@ public class ExternalShuffleIntegrationSuite {
     exec0Fetch.releaseBuffers();
   }
 
-  @Test
-  public void testFetchInvalidShuffle() throws Exception {
+  @Test (expected = RuntimeException.class)
+  public void testRegisterInvalidExecutor() throws Exception {
     registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
-    FetchResult execFetch = fetchBlocks("exec-1", new String[] { "shuffle_1_0_0" });
-    assertTrue(execFetch.successBlocks.isEmpty());
-    assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
   }
 
   @Test
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index acc1168f83..a0f69ca29a 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -93,7 +93,8 @@ public class ExternalShuffleSecuritySuite {
     client.init(appId);
     // Registration either succeeds or throws an exception.
     client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
-      new ExecutorShuffleInfo(new String[0], 0, ""));
+      new ExecutorShuffleInfo(new String[0], 0,
+        "org.apache.spark.shuffle.sort.SortShuffleManager"));
     client.close();
   }
 
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
index 364fad664e..4ea8a7120a 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
@@ -29,9 +29,6 @@ import org.apache.spark.{ShuffleDependency, TaskContext}
  */
 private[spark] trait ShuffleManager {
 
-  /** Return short name for the ShuffleManager */
-  val shortName: String
-
   /**
    * Register a shuffle with the manager and obtain a handle for it to pass to tasks.
    */
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 9bfd966e33..5e977a16fe 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -79,8 +79,6 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
    */
   private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
 
-  override val shortName: String = "sort"
-
   override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 22bc76b143..1c4921666f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -182,7 +182,7 @@ private[spark] class BlockManager(
     val shuffleConfig = new ExecutorShuffleInfo(
       diskBlockManager.localDirs.map(_.toString),
       diskBlockManager.subDirsPerLocalDir,
-      shuffleManager.shortName)
+      shuffleManager.getClass.getName)
 
     val MAX_ATTEMPTS = 3
     val SLEEP_TIME_SECS = 5
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 5a426b86d1..0e433f6c1b 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 
 class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
   private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
+  private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -87,8 +88,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
 
     val execStateFile = s1.registeredExecutorFile
     execStateFile should not be (null)
-    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
-    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
+    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
+    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
 
     val blockHandler = s1.blockHandler
     val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
@@ -158,8 +159,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
 
     val execStateFile = s1.registeredExecutorFile
     execStateFile should not be (null)
-    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
-    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
+    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
+    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
 
     val blockHandler = s1.blockHandler
     val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
@@ -186,7 +187,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     s1.initializeApplication(app1Data)
 
     val execStateFile = s1.registeredExecutorFile
-    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
+    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
 
     val blockHandler = s1.blockHandler
     val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
@@ -218,7 +219,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     val app2Data: ApplicationInitializationContext =
       new ApplicationInitializationContext("user", app2Id, null)
     s2.initializeApplication(app2Data)
-    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
+    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
     resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
     ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (Some(shuffleInfo2))
     s2.stop()
-- 
GitLab