diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index ce5c68e85375e56781c7968d8a82c145306a0cd0..30712012669bd1811955a2a30df5b685aeb8dda0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -49,7 +49,7 @@ import org.apache.spark.network.util.TransportConf; * Manages converting shuffle BlockIds into physical segments of local files, from a process outside * of Executors. Each Executor must register its own configuration about where it stores its files * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated - * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver. + * from Spark's IndexShuffleBlockResolver. */ public class ExternalShuffleBlockResolver { private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class); @@ -185,8 +185,6 @@ public class ExternalShuffleBlockResolver { if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) { return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); - } else if ("hash".equals(executor.shuffleManager)) { - return getHashBasedShuffleBlockData(executor, blockId); } else { throw new UnsupportedOperationException( "Unsupported shuffle manager: " + executor.shuffleManager); @@ -250,15 +248,6 @@ public class ExternalShuffleBlockResolver { } } - /** - * Hash-based shuffle data is simply stored as one file per block. - * This logic is from FileShuffleBlockResolver. - */ - private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) { - File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); - return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length()); - } - /** * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver, diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java index 102d4efb8bf3b445045e663d955c8c192c94172b..93758bdc58fb0e385d2bb6325711ecdfdb366362 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java @@ -33,7 +33,7 @@ public class ExecutorShuffleInfo implements Encodable { public final String[] localDirs; /** Number of subdirectories created within each localDir. */ public final int subDirsPerLocalDir; - /** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */ + /** Shuffle manager (SortShuffleManager) that the executor is using. */ public final String shuffleManager; @JsonCreator diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index d9b5f0261aabaf52bbec8fba65a2f9e225182472..de4840a5880c2dd9b0b1bf3e9255102d46de83c2 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -38,9 +38,6 @@ public class ExternalShuffleBlockResolverSuite { private static final String sortBlock0 = "Hello!"; private static final String sortBlock1 = "World!"; - private static final String hashBlock0 = "Elementary"; - private static final String hashBlock1 = "Tabular"; - private static TestShuffleDataContext dataContext; private static final TransportConf conf = @@ -51,13 +48,10 @@ public class ExternalShuffleBlockResolverSuite { dataContext = new TestShuffleDataContext(2, 5); dataContext.create(); - // Write some sort and hash data. + // Write some sort data. dataContext.insertSortShuffleData(0, 0, new byte[][] { sortBlock0.getBytes(StandardCharsets.UTF_8), sortBlock1.getBytes(StandardCharsets.UTF_8)}); - dataContext.insertHashShuffleData(1, 0, new byte[][] { - hashBlock0.getBytes(StandardCharsets.UTF_8), - hashBlock1.getBytes(StandardCharsets.UTF_8)}); } @AfterClass @@ -117,27 +111,6 @@ public class ExternalShuffleBlockResolverSuite { assertEquals(sortBlock1, block1); } - @Test - public void testHashShuffleBlocks() throws IOException { - ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); - resolver.registerExecutor("app0", "exec0", - dataContext.createExecutorInfo("hash")); - - InputStream block0Stream = - resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream(); - String block0 = CharStreams.toString( - new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); - block0Stream.close(); - assertEquals(hashBlock0, block0); - - InputStream block1Stream = - resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream(); - String block1 = CharStreams.toString( - new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); - block1Stream.close(); - assertEquals(hashBlock1, block1); - } - @Test public void jsonSerializationOfExecutorRegistration() throws IOException { ObjectMapper mapper = new ObjectMapper(); @@ -147,7 +120,7 @@ public class ExternalShuffleBlockResolverSuite { assertEquals(parsedAppId, appId); ExecutorShuffleInfo shuffleInfo = - new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash"); + new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "sort"); String shuffleJson = mapper.writeValueAsString(shuffleInfo); ExecutorShuffleInfo parsedShuffleInfo = mapper.readValue(shuffleJson, ExecutorShuffleInfo.class); @@ -158,7 +131,7 @@ public class ExternalShuffleBlockResolverSuite { String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}"; assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class)); String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " + - "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}"; + "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"sort\"}"; assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class)); } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index 43d02014058723010e31375ca5c97269ba9a970a..fa5cd1398aa0fbe87d7510de42631ec6797035fa 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -144,9 +144,6 @@ public class ExternalShuffleCleanupSuite { dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] { "ABC".getBytes(StandardCharsets.UTF_8), "DEF".getBytes(StandardCharsets.UTF_8)}); - dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, new byte[][] { - "GHI".getBytes(StandardCharsets.UTF_8), - "JKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8)}); return dataContext; } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index ecbbe7bfa3b115d48eb5e11216ce991109ec3342..067c815c30a51fb5f51f9b9f3ca02b19f9d8cb07 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -50,12 +50,9 @@ public class ExternalShuffleIntegrationSuite { static String APP_ID = "app-id"; static String SORT_MANAGER = "sort"; - static String HASH_MANAGER = "hash"; // Executor 0 is sort-based static TestShuffleDataContext dataContext0; - // Executor 1 is hash-based - static TestShuffleDataContext dataContext1; static ExternalShuffleBlockHandler handler; static TransportServer server; @@ -87,10 +84,6 @@ public class ExternalShuffleIntegrationSuite { dataContext0.create(); dataContext0.insertSortShuffleData(0, 0, exec0Blocks); - dataContext1 = new TestShuffleDataContext(6, 2); - dataContext1.create(); - dataContext1.insertHashShuffleData(1, 0, exec1Blocks); - conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); handler = new ExternalShuffleBlockHandler(conf, null); TransportContext transportContext = new TransportContext(conf, handler); @@ -100,7 +93,6 @@ public class ExternalShuffleIntegrationSuite { @AfterClass public static void afterAll() { dataContext0.cleanup(); - dataContext1.cleanup(); server.close(); } @@ -192,40 +184,18 @@ public class ExternalShuffleIntegrationSuite { exec0Fetch.releaseBuffers(); } - @Test - public void testFetchHash() throws Exception { - registerExecutor("exec-1", dataContext1.createExecutorInfo(HASH_MANAGER)); - FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "shuffle_1_0_0", "shuffle_1_0_1" }); - assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.successBlocks); - assertTrue(execFetch.failedBlocks.isEmpty()); - assertBufferListsEqual(execFetch.buffers, Lists.newArrayList(exec1Blocks)); - execFetch.releaseBuffers(); - } - - @Test - public void testFetchWrongShuffle() throws Exception { - registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */)); - FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "shuffle_1_0_0", "shuffle_1_0_1" }); - assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks); - } - @Test public void testFetchInvalidShuffle() throws Exception { - registerExecutor("exec-1", dataContext1.createExecutorInfo("unknown sort manager")); - FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "shuffle_1_0_0" }); + registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager")); + FetchResult execFetch = fetchBlocks("exec-1", new String[] { "shuffle_1_0_0" }); assertTrue(execFetch.successBlocks.isEmpty()); assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks); } @Test public void testFetchWrongBlockId() throws Exception { - registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */)); - FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "rdd_1_0_0" }); + registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER)); + FetchResult execFetch = fetchBlocks("exec-1", new String[] { "rdd_1_0_0" }); assertTrue(execFetch.successBlocks.isEmpty()); assertEquals(Sets.newHashSet("rdd_1_0_0"), execFetch.failedBlocks); } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 7ac1ca128aed0d9193a25db9f0a0c93f24f4ea89..62a1fb42b023d632dbc6c29610912c60a9a02831 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -29,7 +29,7 @@ import com.google.common.io.Files; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; /** - * Manages some sort- and hash-based shuffle data, including the creation + * Manages some sort-shuffle data, including the creation * and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}. */ public class TestShuffleDataContext { @@ -85,15 +85,6 @@ public class TestShuffleDataContext { } } - /** Creates reducer blocks in a hash-based data format within our local dirs. */ - public void insertHashShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException { - for (int i = 0; i < blocks.length; i ++) { - String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i; - Files.write(blocks[i], - ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId)); - } - } - /** * Creates an ExecutorShuffleInfo object based on the given shuffle manager which targets this * context's directories. diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 3d11db7461c053a0835f6bba5a5fb9250db2232b..27497e21b829dd6da62b00bde13d24b7b9f2ab2e 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -298,9 +298,8 @@ object SparkEnv extends Logging { // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( - "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", - "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", - "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") + "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, + "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala deleted file mode 100644 index be1e84a2ba938330ba78607048204aa83477d6f9..0000000000000000000000000000000000000000 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle - -import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.executor.ShuffleWriteMetrics -import org.apache.spark.internal.Logging -import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} -import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.serializer.Serializer -import org.apache.spark.storage._ -import org.apache.spark.util.Utils - -/** A group of writers for a ShuffleMapTask, one writer per reducer. */ -private[spark] trait ShuffleWriterGroup { - val writers: Array[DiskBlockObjectWriter] - - /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */ - def releaseWriters(success: Boolean): Unit -} - -/** - * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file - * per reducer. - */ -// Note: Changes to the format in this file should be kept in sync with -// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData(). -private[spark] class FileShuffleBlockResolver(conf: SparkConf) - extends ShuffleBlockResolver with Logging { - - private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle") - - private lazy val blockManager = SparkEnv.get.blockManager - - // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided - private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 - - /** - * Contains all the state related to a particular shuffle. - */ - private class ShuffleState(val numReducers: Int) { - /** - * The mapIds of all map tasks completed on this Executor for this shuffle. - */ - val completedMapTasks = new ConcurrentLinkedQueue[Int]() - } - - private val shuffleStates = new ConcurrentHashMap[ShuffleId, ShuffleState] - - /** - * Get a ShuffleWriterGroup for the given map task, which will register it as complete - * when the writers are closed successfully - */ - def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer, - writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { - new ShuffleWriterGroup { - private val shuffleState: ShuffleState = { - // Note: we do _not_ want to just wrap this java ConcurrentHashMap into a Scala map and use - // .getOrElseUpdate() because that's actually NOT atomic. - shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers)) - shuffleStates.get(shuffleId) - } - val openStartTime = System.nanoTime - val serializerInstance = serializer.newInstance() - val writers: Array[DiskBlockObjectWriter] = { - Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId => - val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - val blockFile = blockManager.diskBlockManager.getFile(blockId) - val tmp = Utils.tempFileWith(blockFile) - blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics) - } - } - // Creating the file to write to and creating a disk writer both involve interacting with - // the disk, so should be included in the shuffle write time. - writeMetrics.incWriteTime(System.nanoTime - openStartTime) - - override def releaseWriters(success: Boolean) { - shuffleState.completedMapTasks.add(mapId) - } - } - } - - override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { - val file = blockManager.diskBlockManager.getFile(blockId) - new FileSegmentManagedBuffer(transportConf, file, 0, file.length) - } - - /** Remove all the blocks / files and metadata related to a particular shuffle. */ - def removeShuffle(shuffleId: ShuffleId): Boolean = { - // Do not change the ordering of this, if shuffleStates should be removed only - // after the corresponding shuffle blocks have been removed - val cleaned = removeShuffleBlocks(shuffleId) - shuffleStates.remove(shuffleId) - cleaned - } - - /** Remove all the blocks / files related to a particular shuffle. */ - private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = { - Option(shuffleStates.get(shuffleId)) match { - case Some(state) => - for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) { - val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId) - val file = blockManager.diskBlockManager.getFile(blockId) - if (!file.delete()) { - logWarning(s"Error deleting ${file.getPath()}") - } - } - logInfo("Deleted all files for shuffle " + shuffleId) - true - case None => - logInfo("Could not find files for shuffle " + shuffleId + " for deleting") - false - } - } - - override def stop(): Unit = {} -} diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala deleted file mode 100644 index 6bb4ff94b546d7d2da6f07992b17b572b40c536b..0000000000000000000000000000000000000000 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.hash - -import org.apache.spark._ -import org.apache.spark.internal.Logging -import org.apache.spark.shuffle._ - -/** - * A ShuffleManager using hashing, that creates one output file per reduce partition on each - * mapper (possibly reusing these across waves of tasks). - */ -private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { - - if (!conf.getBoolean("spark.shuffle.spill", true)) { - logWarning( - "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." + - " Shuffle will continue to spill to disk when necessary.") - } - - private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf) - - override val shortName: String = "hash" - - /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ - override def registerShuffle[K, V, C]( - shuffleId: Int, - numMaps: Int, - dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { - new BaseShuffleHandle(shuffleId, numMaps, dependency) - } - - /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). - * Called on executors by reduce tasks. - */ - override def getReader[K, C]( - handle: ShuffleHandle, - startPartition: Int, - endPartition: Int, - context: TaskContext): ShuffleReader[K, C] = { - new BlockStoreShuffleReader( - handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) - } - - /** Get a writer for a given partition. Called on executors by map tasks. */ - override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) - : ShuffleWriter[K, V] = { - new HashShuffleWriter( - shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) - } - - /** Remove a shuffle's metadata from the ShuffleManager. */ - override def unregisterShuffle(shuffleId: Int): Boolean = { - shuffleBlockResolver.removeShuffle(shuffleId) - } - - override def shuffleBlockResolver: FileShuffleBlockResolver = { - fileShuffleBlockResolver - } - - /** Shut down this ShuffleManager. */ - override def stop(): Unit = { - shuffleBlockResolver.stop() - } -} diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala deleted file mode 100644 index 6c4444ffb473275b6cc0ecbee7a109a9450b476b..0000000000000000000000000000000000000000 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.hash - -import java.io.IOException - -import org.apache.spark._ -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.MapStatus -import org.apache.spark.shuffle._ -import org.apache.spark.storage.DiskBlockObjectWriter - -private[spark] class HashShuffleWriter[K, V]( - shuffleBlockResolver: FileShuffleBlockResolver, - handle: BaseShuffleHandle[K, V, _], - mapId: Int, - context: TaskContext) - extends ShuffleWriter[K, V] with Logging { - - private val dep = handle.dependency - private val numOutputSplits = dep.partitioner.numPartitions - private val metrics = context.taskMetrics - - // Are we in the process of stopping? Because map tasks can call stop() with success = true - // and then call stop() with success = false if they get an exception, we want to make sure - // we don't try deleting files, etc twice. - private var stopping = false - - private val writeMetrics = metrics.shuffleWriteMetrics - - private val blockManager = SparkEnv.get.blockManager - private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, - dep.serializer, writeMetrics) - - /** Write a bunch of records to this task's output */ - override def write(records: Iterator[Product2[K, V]]): Unit = { - val iter = if (dep.aggregator.isDefined) { - if (dep.mapSideCombine) { - dep.aggregator.get.combineValuesByKey(records, context) - } else { - records - } - } else { - require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") - records - } - - for (elem <- iter) { - val bucketId = dep.partitioner.getPartition(elem._1) - shuffle.writers(bucketId).write(elem._1, elem._2) - } - } - - /** Close this writer, passing along whether the map completed */ - override def stop(initiallySuccess: Boolean): Option[MapStatus] = { - var success = initiallySuccess - try { - if (stopping) { - return None - } - stopping = true - if (success) { - try { - Some(commitWritesAndBuildStatus()) - } catch { - case e: Exception => - success = false - revertWrites() - throw e - } - } else { - revertWrites() - None - } - } finally { - // Release the writers back to the shuffle block manager. - if (shuffle != null && shuffle.writers != null) { - try { - shuffle.releaseWriters(success) - } catch { - case e: Exception => logError("Failed to release shuffle writers", e) - } - } - } - } - - private def commitWritesAndBuildStatus(): MapStatus = { - // Commit the writes. Get the size of each bucket block (total block size). - val sizes: Array[Long] = shuffle.writers.map { writer: DiskBlockObjectWriter => - writer.commitAndClose() - writer.fileSegment().length - } - // rename all shuffle files to final paths - // Note: there is only one ShuffleBlockResolver in executor - shuffleBlockResolver.synchronized { - shuffle.writers.zipWithIndex.foreach { case (writer, i) => - val output = blockManager.diskBlockManager.getFile(writer.blockId) - if (sizes(i) > 0) { - if (output.exists()) { - // Use length of existing file and delete our own temporary one - sizes(i) = output.length() - writer.file.delete() - } else { - // Commit by renaming our temporary file to something the fetcher expects - if (!writer.file.renameTo(output)) { - throw new IOException(s"fail to rename ${writer.file} to $output") - } - } - } else { - if (output.exists()) { - output.delete() - } - } - } - } - MapStatus(blockManager.shuffleServerId, sizes) - } - - private def revertWrites(): Unit = { - if (shuffle != null && shuffle.writers != null) { - for (writer <- shuffle.writers) { - writer.revertPartialWritesAndClose() - } - } - } -} diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index f98150536d8a8892a66cf0e9ec262400dbb97353..69ff6c7c28ee93313a95e6d3b8efcde81a4be66f 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -30,7 +30,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.internal.Logging import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} -import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage._ @@ -39,7 +38,7 @@ import org.apache.spark.storage._ * suitable for cleaner tests and provides some utility functions. Subclasses can use different * config options, in particular, a different shuffle manager class */ -abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[HashShuffleManager]) +abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager]) extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { implicit val defaultTimeout = timeout(10000 millis) @@ -353,84 +352,6 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { } -/** - * A copy of the shuffle tests for sort-based shuffle - */ -class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[SortShuffleManager]) { - test("cleanup shuffle") { - val (rdd, shuffleDeps) = newRDDWithShuffleDependencies() - val collected = rdd.collect().toList - val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId)) - - // Explicit cleanup - shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true)) - tester.assertCleanup() - - // Verify that shuffles can be re-executed after cleaning up - assert(rdd.collect().toList.equals(collected)) - } - - test("automatically cleanup shuffle") { - var rdd = newShuffleRDD() - rdd.count() - - // Test that GC does not cause shuffle cleanup due to a strong reference - val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) - runGC() - intercept[Exception] { - preGCTester.assertCleanup()(timeout(1000 millis)) - } - rdd.count() // Defeat early collection by the JVM - - // Test that GC causes shuffle cleanup after dereferencing the RDD - val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) - rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope - runGC() - postGCTester.assertCleanup() - } - - test("automatically cleanup RDD + shuffle + broadcast in distributed mode") { - sc.stop() - - val conf2 = new SparkConf() - .setMaster("local-cluster[2, 1, 1024]") - .setAppName("ContextCleanerSuite") - .set("spark.cleaner.referenceTracking.blocking", "true") - .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") - .set("spark.shuffle.manager", shuffleManager.getName) - sc = new SparkContext(conf2) - - val numRdds = 10 - val numBroadcasts = 4 // Broadcasts are more costly - val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer - val broadcastBuffer = (1 to numBroadcasts).map(i => newBroadcast).toBuffer - val rddIds = sc.persistentRdds.keys.toSeq - val shuffleIds = 0 until sc.newShuffleId() - val broadcastIds = broadcastBuffer.map(_.id) - - val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) - runGC() - intercept[Exception] { - preGCTester.assertCleanup()(timeout(1000 millis)) - } - - // Test that GC triggers the cleanup of all variables after the dereferencing them - val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) - broadcastBuffer.clear() - rddBuffer.clear() - runGC() - postGCTester.assertCleanup() - - // Make sure the broadcasted task closure no longer exists after GC. - val taskClosureBroadcastId = broadcastIds.max + 1 - assert(sc.env.blockManager.master.getMatchingBlockIds({ - case BroadcastBlockId(`taskClosureBroadcastId`, _) => true - case _ => false - }, askSlaves = true).isEmpty) - } -} - - /** * Class to test whether RDDs, shuffles, etc. have been successfully cleaned. * The checkpoint here refers only to normal (reliable) checkpoints, not local checkpoints. diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala deleted file mode 100644 index 10794235ed392878734dbdb9f8a5d7483e130423..0000000000000000000000000000000000000000 --- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import org.scalatest.BeforeAndAfterAll - -class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { - - // This test suite should run all tests in ShuffleSuite with hash-based shuffle. - - override def beforeAll() { - super.beforeAll() - conf.set("spark.shuffle.manager", "hash") - } -} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index d26df7e760ceac47c524211f49301542df0e4888..d14728cb505550a90957e774e82117fae07cf4a6 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{KryoSerializer, SerializerManager} -import org.apache.spark.shuffle.hash.HashShuffleManager +import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.StorageLevel._ /** Testsuite that tests block replication in BlockManager */ @@ -44,7 +44,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo private var master: BlockManagerMaster = null private val securityMgr = new SecurityManager(conf) private val mapOutputTracker = new MapOutputTrackerMaster(conf) - private val shuffleManager = new HashShuffleManager(conf) + private val shuffleManager = new SortShuffleManager(conf) // List of block manager created during an unit test, so that all of the them can be stopped // after the unit test. diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index a1c2933584accc3d9a9c2217e3fcb222b03169aa..db1efaf2a20b8406ece4dd8406907cdc6cf72cf6 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.network.shuffle.BlockFetchingListener import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager} -import org.apache.spark.shuffle.hash.HashShuffleManager +import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer @@ -60,7 +60,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE var master: BlockManagerMaster = null val securityMgr = new SecurityManager(new SparkConf(false)) val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false)) - val shuffleManager = new HashShuffleManager(new SparkConf(false)) + val shuffleManager = new SortShuffleManager(new SparkConf(false)) // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index dc3185a6d505a2b14221e33ecf4783f3fcfcc585..2410118fb7172c78d1a4a3e47eeb68a109cc2814 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -237,7 +237,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { private def testSimpleSpilling(codec: Option[String] = None): Unit = { val size = 1000 val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home - conf.set("spark.shuffle.manager", "hash") // avoid using external sorter conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) @@ -401,7 +400,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { test("external aggregation updates peak execution memory") { val spillThreshold = 1000 val conf = createSparkConf(loadDefaults = false) - .set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter .set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString) sc = new SparkContext("local", "test", conf) // No spilling diff --git a/docs/configuration.md b/docs/configuration.md index 16d5be62f9e89019ff9a1fea6226a09d54710741..6512e16faf4c1a71a8afd4351e8ac1678c6e8c5d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -455,15 +455,6 @@ Apart from these, the following properties are also available, and may be useful is 15 seconds by default, calculated as <code>maxRetries * retryWait</code>. </td> </tr> -<tr> - <td><code>spark.shuffle.manager</code></td> - <td>sort</td> - <td> - Implementation to use for shuffling data. There are two implementations available: - <code>sort</code> and <code>hash</code>. - Sort-based shuffle is more memory-efficient and is the default option starting in 1.2. - </td> -</tr> <tr> <td><code>spark.shuffle.service.enabled</code></td> <td>false</td> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index 7e35db7dd8a799acfe1424181588d64598a62158..d7deac93374c9551f7f9b056e65797cf717ac270 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -22,7 +22,6 @@ import java.util.Random import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ @@ -179,9 +178,6 @@ object ShuffleExchange { // copy. true } - } else if (shuffleManager.isInstanceOf[HashShuffleManager]) { - // We're using hash-based shuffle, so we don't need to copy. - false } else { // Catch-all case to safely handle any future ShuffleManager implementations. true diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 39d0de5179ea9d76c2fb5f7ba7ae794d5c2343de..4be4882938df568de9d7fa654379c5829dd11ace 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{KryoSerializer, SerializerManager} -import org.apache.spark.shuffle.hash.HashShuffleManager +import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.util._ @@ -58,7 +58,7 @@ class ReceivedBlockHandlerSuite val streamId = 1 val securityMgr = new SecurityManager(conf) val mapOutputTracker = new MapOutputTrackerMaster(conf) - val shuffleManager = new HashShuffleManager(conf) + val shuffleManager = new SortShuffleManager(conf) val serializer = new KryoSerializer(conf) var serializerManager = new SerializerManager(serializer, conf) val manualClock = new ManualClock