diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4c378a278b4c144b0f50ebc46e171c3b41fe616a..5fa584591d935c19c78cd54800a2d4b096fe3302 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -221,7 +221,7 @@ private[spark] class Executor( // directSend = sending directly back to the driver val serializedResult = { - if (resultSize > maxResultSize) { + if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d8fb6403503438be800573adee2ace301a4c5a3d..cabdc655f89bf59aeaa141915cdbb26f0e722e55 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -536,7 +536,7 @@ private[spark] class TaskSetManager( calculatedTasks += 1 if (maxResultSize > 0 && totalResultSize > maxResultSize) { val msg = s"Total size of serialized results of ${calculatedTasks} tasks " + - s"(${Utils.bytesToString(totalResultSize)}) is bigger than maxResultSize " + + s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " + s"(${Utils.bytesToString(maxResultSize)})" logError(msg) abort(msg) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 1809b5396d53eac0e790d71bd95a15ec258053bb..472191551a01ffa2ca6a8622d62f3c96c47966a1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -579,13 +579,13 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // single 10M result val thrown = intercept[SparkException] {sc.makeRDD(genBytes(10 << 20)(0), 1).collect()} - assert(thrown.getMessage().contains("bigger than maxResultSize")) + assert(thrown.getMessage().contains("bigger than spark.driver.maxResultSize")) // multiple 1M results val thrown2 = intercept[SparkException] { sc.makeRDD(0 until 10, 10).map(genBytes(1 << 20)).collect() } - assert(thrown2.getMessage().contains("bigger than maxResultSize")) + assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize")) } test("speculative and noPref task should be scheduled after node-local") {