diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index f92189b707fb59b3f58a729d4c5349e0605591ff..4cb0bd414243507e47127e71c0fe505a28ee2626 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -349,7 +349,6 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } private[spark] object MapOutputTracker { - private val LOG_BASE = 1.1 // Serialize an array of map output locations into an efficient byte format so that we can send // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will @@ -385,34 +384,8 @@ private[spark] object MapOutputTracker { throw new MetadataFetchFailedException( shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId) } else { - (status.location, decompressSize(status.compressedSizes(reduceId))) + (status.location, status.getSizeForBlock(reduceId)) } } } - - /** - * Compress a size in bytes to 8 bits for efficient reporting of map output sizes. - * We do this by encoding the log base 1.1 of the size as an integer, which can support - * sizes up to 35 GB with at most 10% error. - */ - def compressSize(size: Long): Byte = { - if (size == 0) { - 0 - } else if (size <= 1L) { - 1 - } else { - math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte - } - } - - /** - * Decompress an 8-bit encoded block size, using the reverse operation of compressSize. - */ - def decompressSize(compressedSize: Byte): Long = { - if (compressedSize == 0) { - 0 - } else { - math.pow(LOG_BASE, compressedSize & 0xFF).toLong - } - } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index d3f63ff92ac6f480483f4b6acbda488c728e67a8..e25096ea92d70aeea9514f6dff274713c81d1214 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -24,22 +24,123 @@ import org.apache.spark.storage.BlockManagerId /** * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. - * The map output sizes are compressed using MapOutputTracker.compressSize. */ -private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte]) - extends Externalizable { +private[spark] sealed trait MapStatus { + /** Location where this task was run. */ + def location: BlockManagerId - def this() = this(null, null) // For deserialization only + /** Estimated size for the reduce block, in bytes. */ + def getSizeForBlock(reduceId: Int): Long +} + + +private[spark] object MapStatus { + + def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { + if (uncompressedSizes.length > 2000) { + new HighlyCompressedMapStatus(loc, uncompressedSizes) + } else { + new CompressedMapStatus(loc, uncompressedSizes) + } + } + + private[this] val LOG_BASE = 1.1 + + /** + * Compress a size in bytes to 8 bits for efficient reporting of map output sizes. + * We do this by encoding the log base 1.1 of the size as an integer, which can support + * sizes up to 35 GB with at most 10% error. + */ + def compressSize(size: Long): Byte = { + if (size == 0) { + 0 + } else if (size <= 1L) { + 1 + } else { + math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte + } + } + + /** + * Decompress an 8-bit encoded block size, using the reverse operation of compressSize. + */ + def decompressSize(compressedSize: Byte): Long = { + if (compressedSize == 0) { + 0 + } else { + math.pow(LOG_BASE, compressedSize & 0xFF).toLong + } + } +} + + +/** + * A [[MapStatus]] implementation that tracks the size of each block. Size for each block is + * represented using a single byte. + * + * @param loc location where the task is being executed. + * @param compressedSizes size of the blocks, indexed by reduce partition id. + */ +private[spark] class CompressedMapStatus( + private[this] var loc: BlockManagerId, + private[this] var compressedSizes: Array[Byte]) + extends MapStatus with Externalizable { + + protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only + + def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) { + this(loc, uncompressedSizes.map(MapStatus.compressSize)) + } - def writeExternal(out: ObjectOutput) { - location.writeExternal(out) + override def location: BlockManagerId = loc + + override def getSizeForBlock(reduceId: Int): Long = { + MapStatus.decompressSize(compressedSizes(reduceId)) + } + + override def writeExternal(out: ObjectOutput): Unit = { + loc.writeExternal(out) out.writeInt(compressedSizes.length) out.write(compressedSizes) } - def readExternal(in: ObjectInput) { - location = BlockManagerId(in) - compressedSizes = new Array[Byte](in.readInt()) + override def readExternal(in: ObjectInput): Unit = { + loc = BlockManagerId(in) + val len = in.readInt() + compressedSizes = new Array[Byte](len) in.readFully(compressedSizes) } } + + +/** + * A [[MapStatus]] implementation that only stores the average size of the blocks. + * + * @param loc location where the task is being executed. + * @param avgSize average size of all the blocks + */ +private[spark] class HighlyCompressedMapStatus( + private[this] var loc: BlockManagerId, + private[this] var avgSize: Long) + extends MapStatus with Externalizable { + + def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) { + this(loc, uncompressedSizes.sum / uncompressedSizes.length) + } + + protected def this() = this(null, 0L) // For deserialization only + + override def location: BlockManagerId = loc + + override def getSizeForBlock(reduceId: Int): Long = avgSize + + override def writeExternal(out: ObjectOutput): Unit = { + loc.writeExternal(out) + out.writeLong(avgSize) + } + + override def readExternal(in: ObjectInput): Unit = { + loc = BlockManagerId(in) + avgSize = in.readLong() + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 4b9454d75abb7cc2c1367a7941007828232e6727..746ed33b54c00eb515a4006e6dac114df228d2c4 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -103,13 +103,11 @@ private[spark] class HashShuffleWriter[K, V]( private def commitWritesAndBuildStatus(): MapStatus = { // Commit the writes. Get the size of each bucket block (total block size). - val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter => + val sizes: Array[Long] = shuffle.writers.map { writer: BlockObjectWriter => writer.commitAndClose() - val size = writer.fileSegment().length - MapOutputTracker.compressSize(size) + writer.fileSegment().length } - - new MapStatus(blockManager.blockManagerId, compressedSizes) + MapStatus(blockManager.blockManagerId, sizes) } private def revertWrites(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 89a78d6982ba0fd6c1286a67f8d375436aecc60d..927481b72cf4f797a19774779ea42a6479b9374e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C]( val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) - mapStatus = new MapStatus(blockManager.blockManagerId, - partitionLengths.map(MapOutputTracker.compressSize)) + mapStatus = MapStatus(blockManager.blockManagerId, partitionLengths) } /** Close this writer, passing along whether the map completed */ diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 5369169811f81ec23f82c3698d51521fc80cd288..1fef79ad1001f36cba610e61b31b92300a0ceead 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -23,32 +23,13 @@ import akka.actor._ import akka.testkit.TestActorRef import org.scalatest.FunSuite -import org.apache.spark.scheduler.MapStatus +import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AkkaUtils class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { private val conf = new SparkConf - test("compressSize") { - assert(MapOutputTracker.compressSize(0L) === 0) - assert(MapOutputTracker.compressSize(1L) === 1) - assert(MapOutputTracker.compressSize(2L) === 8) - assert(MapOutputTracker.compressSize(10L) === 25) - assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145) - assert((MapOutputTracker.compressSize(1000000000L) & 0xFF) === 218) - // This last size is bigger than we can encode in a byte, so check that we just return 255 - assert((MapOutputTracker.compressSize(1000000000000000000L) & 0xFF) === 255) - } - - test("decompressSize") { - assert(MapOutputTracker.decompressSize(0) === 0) - for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) { - val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size)) - assert(size2 >= 0.99 * size && size2 <= 1.11 * size, - "size " + size + " decompressed to " + size2 + ", which is out of range") - } - } test("master start and stop") { val actorSystem = ActorSystem("test") @@ -65,14 +46,12 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.registerShuffle(10, 2) assert(tracker.containsShuffle(10)) - val compressedSize1000 = MapOutputTracker.compressSize(1000L) - val compressedSize10000 = MapOutputTracker.compressSize(10000L) - val size1000 = MapOutputTracker.decompressSize(compressedSize1000) - val size10000 = MapOutputTracker.decompressSize(compressedSize10000) - tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000), - Array(compressedSize1000, compressedSize10000))) - tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000), - Array(compressedSize10000, compressedSize1000))) + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) + tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + Array(1000L, 10000L))) + tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), + Array(10000L, 1000L))) val statuses = tracker.getServerStatuses(10, 0) assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000), (BlockManagerId("b", "hostB", 1000), size10000))) @@ -84,11 +63,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val tracker = new MapOutputTrackerMaster(conf) tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.registerShuffle(10, 2) - val compressedSize1000 = MapOutputTracker.compressSize(1000L) - val compressedSize10000 = MapOutputTracker.compressSize(10000L) - tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000), + val compressedSize1000 = MapStatus.compressSize(1000L) + val compressedSize10000 = MapStatus.compressSize(10000L) + tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), Array(compressedSize1000, compressedSize10000))) - tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), Array(compressedSize10000, compressedSize1000))) assert(tracker.containsShuffle(10)) assert(tracker.getServerStatuses(10, 0).nonEmpty) @@ -103,11 +82,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.registerShuffle(10, 2) - val compressedSize1000 = MapOutputTracker.compressSize(1000L) - val compressedSize10000 = MapOutputTracker.compressSize(10000L) - tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000), + val compressedSize1000 = MapStatus.compressSize(1000L) + val compressedSize10000 = MapStatus.compressSize(10000L) + tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), Array(compressedSize1000, compressedSize1000, compressedSize1000))) - tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), Array(compressedSize10000, compressedSize1000, compressedSize1000))) // As if we had two simultaneous fetch failures @@ -142,10 +121,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { slaveTracker.updateEpoch(masterTracker.getEpoch) intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } - val compressedSize1000 = MapOutputTracker.compressSize(1000L) - val size1000 = MapOutputTracker.decompressSize(compressedSize1000) - masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("a", "hostA", 1000), Array(compressedSize1000))) + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + masterTracker.registerMapOutput(10, 0, MapStatus( + BlockManagerId("a", "hostA", 1000), Array(1000L))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) assert(slaveTracker.getServerStatuses(10, 0).toSeq === @@ -173,8 +151,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { // Frame size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) - masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("88", "mph", 1000), Array.fill[Byte](10)(0))) + masterTracker.registerMapOutput(10, 0, MapStatus( + BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0))) masterActor.receive(GetMapOutputStatuses(10)) } @@ -194,8 +172,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { // being sent. masterTracker.registerShuffle(20, 100) (0 until 100).foreach { i => - masterTracker.registerMapOutput(20, i, new MapStatus( - BlockManagerId("999", "mps", 1000), Array.fill[Byte](4000000)(0))) + masterTracker.registerMapOutput(20, i, new CompressedMapStatus( + BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0))) } intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index aa73469b6acd8b96ef0dd1255db0043b31ae2dc1..a2e4f712db55b1c7807822c7b52b8babac92e055 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -740,7 +740,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } private def makeMapStatus(host: String, reduces: Int): MapStatus = - new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2)) + MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2)) private def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345) diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..79e04f046e4c4faf092d7dd7c549efb8afaa53b2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.storage.BlockManagerId +import org.scalatest.FunSuite + +import org.apache.spark.SparkConf +import org.apache.spark.serializer.JavaSerializer + + +class MapStatusSuite extends FunSuite { + + test("compressSize") { + assert(MapStatus.compressSize(0L) === 0) + assert(MapStatus.compressSize(1L) === 1) + assert(MapStatus.compressSize(2L) === 8) + assert(MapStatus.compressSize(10L) === 25) + assert((MapStatus.compressSize(1000000L) & 0xFF) === 145) + assert((MapStatus.compressSize(1000000000L) & 0xFF) === 218) + // This last size is bigger than we can encode in a byte, so check that we just return 255 + assert((MapStatus.compressSize(1000000000000000000L) & 0xFF) === 255) + } + + test("decompressSize") { + assert(MapStatus.decompressSize(0) === 0) + for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) { + val size2 = MapStatus.decompressSize(MapStatus.compressSize(size)) + assert(size2 >= 0.99 * size && size2 <= 1.11 * size, + "size " + size + " decompressed to " + size2 + ", which is out of range") + } + } + + test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) { + val sizes = Array.fill[Long](2001)(150L) + val status = MapStatus(null, sizes) + assert(status.isInstanceOf[HighlyCompressedMapStatus]) + assert(status.getSizeForBlock(10) === 150L) + assert(status.getSizeForBlock(50) === 150L) + assert(status.getSizeForBlock(99) === 150L) + assert(status.getSizeForBlock(2000) === 150L) + } + + test(classOf[HighlyCompressedMapStatus].getName + ": estimated size is within 10%") { + val sizes = Array.tabulate[Long](50) { i => i.toLong } + val loc = BlockManagerId("a", "b", 10) + val status = MapStatus(loc, sizes) + val ser = new JavaSerializer(new SparkConf) + val buf = ser.newInstance().serialize(status) + val status1 = ser.newInstance().deserialize[MapStatus](buf) + assert(status1.location == loc) + for (i <- 0 until sizes.length) { + // make sure the estimated size is within 10% of the input; note that we skip the very small + // sizes because the compression is very lossy there. + val estimate = status1.getSizeForBlock(i) + if (estimate > 100) { + assert(math.abs(estimate - sizes(i)) * 10 <= sizes(i), + s"incorrect estimated size $estimate, original was ${sizes(i)}") + } + } + } + + test(classOf[HighlyCompressedMapStatus].getName + ": estimated size should be the average size") { + val sizes = Array.tabulate[Long](3000) { i => i.toLong } + val avg = sizes.sum / sizes.length + val loc = BlockManagerId("a", "b", 10) + val status = MapStatus(loc, sizes) + val ser = new JavaSerializer(new SparkConf) + val buf = ser.newInstance().serialize(status) + val status1 = ser.newInstance().deserialize[MapStatus](buf) + assert(status1.location == loc) + for (i <- 0 until 3000) { + val estimate = status1.getSizeForBlock(i) + assert(estimate === avg) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index 76bf4cfd112678d356d061cd649414d22c0ddf3d..7bca1711ae2269ebb67653945ca8c5a1bc114acf 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -106,10 +106,9 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) - val compressedSize1000 = MapOutputTracker.compressSize(1000L) - val size1000 = MapOutputTracker.decompressSize(compressedSize1000) - masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("a", "hostA", 1000), Array(compressedSize1000))) + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + masterTracker.registerMapOutput(10, 0, + MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) @@ -157,10 +156,9 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) - val compressedSize1000 = MapOutputTracker.compressSize(1000L) - val size1000 = MapOutputTracker.decompressSize(compressedSize1000) - masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("a", "hostA", 1000), Array(compressedSize1000))) + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + masterTracker.registerMapOutput(10, 0, MapStatus( + BlockManagerId("a", "hostA", 1000), Array(1000L))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 1adfaa18c6202f33e1ef4f367ea8430e982acb24..4076ebc6fc8d57b081fd9acf6c560150c401ed34 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -45,7 +45,10 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"), ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2") + "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2"), + // MapStatus should be private[spark] + ProblemFilters.exclude[IncompatibleTemplateDefProblem]( + "org.apache.spark.scheduler.MapStatus") ) case v if v.startsWith("1.1") =>