Skip to content
Snippets Groups Projects
Commit c0b3e45e authored by Kris Mok's avatar Kris Mok Committed by Xiao Li
Browse files

[SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator

## What changes were proposed in this pull request?

A one-liner change in `ShuffleExchange.nodeName` to cover the case when `coordinator` is `null`, so that the match expression is exhaustive.

Please refer to [SPARK-20872](https://issues.apache.org/jira/browse/SPARK-20872) for a description of the symptoms.
TL;DR is that inspecting a `ShuffleExchange` (directly or transitively) on the Executor side can hit a case where the `coordinator` field of a `ShuffleExchange` is null, and thus will trigger a `MatchError` in `ShuffleExchange.nodeName()`'s inexhaustive match expression.

Also changed two other match conditions in `ShuffleExchange` on the `coordinator` field to be consistent.

## How was this patch tested?

Manually tested this change with a case where the `coordinator` is null to make sure `ShuffleExchange.nodeName` doesn't throw a `MatchError` any more.

Author: Kris Mok <kris.mok@databricks.com>

Closes #18095 from rednaxelafx/shuffleexchange-nodename.
parent 95aef660
No related branches found
No related tags found
No related merge requests found
......@@ -40,6 +40,9 @@ case class ShuffleExchange(
child: SparkPlan,
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
// NOTE: coordinator can be null after serialization/deserialization,
// e.g. it can be null on the Executor side
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
......@@ -47,7 +50,7 @@ case class ShuffleExchange(
val extraInfo = coordinator match {
case Some(exchangeCoordinator) =>
s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})"
case None => ""
case _ => ""
}
val simpleNodeName = "Exchange"
......@@ -70,7 +73,7 @@ case class ShuffleExchange(
// the plan.
coordinator match {
case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this)
case None =>
case _ =>
}
}
......@@ -117,7 +120,7 @@ case class ShuffleExchange(
val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
shuffleRDD
case None =>
case _ =>
val shuffleDependency = prepareShuffleDependency()
preparePostShuffleRDD(shuffleDependency)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment