From 18b3b00ecfde6c694fb6fee4f4d07d04e3d08ccf Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski <julek@databricks.com> Date: Mon, 10 Jul 2017 09:26:42 -0700 Subject: [PATCH] [SPARK-21272] SortMergeJoin LeftAnti does not update numOutputRows ## What changes were proposed in this pull request? Updating numOutputRows metric was missing from one return path of LeftAnti SortMergeJoin. ## How was this patch tested? Non-zero output rows manually seen in metrics. Author: Juliusz Sompolski <julek@databricks.com> Closes #18494 from juliuszsompolski/SPARK-21272. --- .../sql/execution/joins/SortMergeJoinExec.scala | 1 + .../spark/sql/execution/metric/SQLMetricsSuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 8445c26eee..639b8e00c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -290,6 +290,7 @@ case class SortMergeJoinExec( currentLeftRow = smjScanner.getStreamedRow val currentRightMatches = smjScanner.getBufferedMatches if (currentRightMatches == null || currentRightMatches.length == 0) { + numOutputRows += 1 return true } var found = false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index cb3405b2fe..2911cbbeee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -483,6 +483,18 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { } } + test("SortMergeJoin(left-anti) metrics") { + val anti = testData2.filter("a > 2") + withTempView("antiData") { + anti.createOrReplaceTempView("antiData") + val df = spark.sql( + "SELECT * FROM testData2 ANTI JOIN antiData ON testData2.a = antiData.a") + testSparkPlanMetrics(df, 1, Map( + 0L -> ("SortMergeJoin", Map("number of output rows" -> 4L))) + ) + } + } + test("save metrics") { withTempPath { file => // person creates a temporary view. get the DF before listing previous execution IDs -- GitLab