Skip to content
Snippets Groups Projects
Commit 1ca5e2e0 authored by Josh Rosen's avatar Josh Rosen Committed by Reynold Xin
Browse files

[SPARK-10704] Rename HashShuffleReader to BlockStoreShuffleReader

The current shuffle code has an interface named ShuffleReader with only one implementation, HashShuffleReader. This naming is confusing, since the same read path code is used for both sort- and hash-based shuffle. This patch addresses this by renaming HashShuffleReader to BlockStoreShuffleReader.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #8825 from JoshRosen/shuffle-reader-cleanup.
parent 22d40159
No related branches found
No related tags found
No related merge requests found
......@@ -15,16 +15,15 @@
* limitations under the License.
*/
package org.apache.spark.shuffle.hash
package org.apache.spark.shuffle
import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter
private[spark] class HashShuffleReader[K, C](
private[spark] class BlockStoreShuffleReader[K, C](
handle: BaseShuffleHandle[K, _, C],
startPartition: Int,
endPartition: Int,
......
......@@ -51,7 +51,7 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
new HashShuffleReader(
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
......
......@@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.{Logging, SparkConf, TaskContext, ShuffleDependency}
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.hash.HashShuffleReader
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
......@@ -54,7 +53,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
// We currently use the same block store shuffle fetcher as the hash-based shuffle.
new HashShuffleReader(
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.shuffle.hash
package org.apache.spark.shuffle
import java.io.{ByteArrayOutputStream, InputStream}
import java.nio.ByteBuffer
......@@ -28,7 +28,6 @@ import org.mockito.stubbing.Answer
import org.apache.spark._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.shuffle.BaseShuffleHandle
import org.apache.spark.storage.{BlockManager, BlockManagerId, ShuffleBlockId}
/**
......@@ -56,7 +55,7 @@ class RecordingManagedBuffer(underlyingBuffer: NioManagedBuffer) extends Managed
}
}
class HashShuffleReaderSuite extends SparkFunSuite with LocalSparkContext {
class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext {
/**
* This test makes sure that, when data is read from a HashShuffleReader, the underlying
......@@ -134,7 +133,7 @@ class HashShuffleReaderSuite extends SparkFunSuite with LocalSparkContext {
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
val shuffleReader = new HashShuffleReader(
val shuffleReader = new BlockStoreShuffleReader(
shuffleHandle,
reduceId,
reduceId + 1,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment