Skip to content
Snippets Groups Projects
Commit e8b1955e authored by Josh Rosen's avatar Josh Rosen Committed by Reynold Xin
Browse files

[SPARK-18462] Fix ClassCastException in SparkListenerDriverAccumUpdates event

## What changes were proposed in this pull request?

This patch fixes a `ClassCastException: java.lang.Integer cannot be cast to java.lang.Long` error which could occur in the HistoryServer while trying to process a deserialized `SparkListenerDriverAccumUpdates` event.

The problem stems from how `jackson-module-scala` handles primitive type parameters (see https://github.com/FasterXML/jackson-module-scala/wiki/FAQ#deserializing-optionint-and-other-primitive-challenges

 for more details). This was causing a problem where our code expected a field to be deserialized as a `(Long, Long)` tuple but we got an `(Int, Int)` tuple instead.

This patch hacks around this issue by registering a custom `Converter` with Jackson in order to deserialize the tuples as `(Object, Object)` and perform the appropriate casting.

## How was this patch tested?

New regression tests in `SQLListenerSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15922 from JoshRosen/SPARK-18462.

(cherry picked from commit d9dd979d)
Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
parent fc466be4
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,11 @@ package org.apache.spark.sql.execution.ui
import scala.collection.mutable
import com.fasterxml.jackson.databind.JavaType
import com.fasterxml.jackson.databind.`type`.TypeFactory
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.util.Converter
import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
......@@ -43,9 +48,41 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)])
case class SparkListenerDriverAccumUpdates(
executionId: Long,
@JsonDeserialize(contentConverter = classOf[LongLongTupleConverter])
accumUpdates: Seq[(Long, Long)])
extends SparkListenerEvent
/**
* Jackson [[Converter]] for converting an (Int, Int) tuple into a (Long, Long) tuple.
*
* This is necessary due to limitations in how Jackson's scala module deserializes primitives;
* see the "Deserializing Option[Int] and other primitive challenges" section in
* https://github.com/FasterXML/jackson-module-scala/wiki/FAQ for a discussion of this issue and
* SPARK-18462 for the specific problem that motivated this conversion.
*/
private class LongLongTupleConverter extends Converter[(Object, Object), (Long, Long)] {
override def convert(in: (Object, Object)): (Long, Long) = {
def toLong(a: Object): Long = a match {
case i: java.lang.Integer => i.intValue()
case l: java.lang.Long => l.longValue()
}
(toLong(in._1), toLong(in._2))
}
override def getInputType(typeFactory: TypeFactory): JavaType = {
val objectType = typeFactory.uncheckedSimpleType(classOf[Object])
typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(objectType, objectType))
}
override def getOutputType(typeFactory: TypeFactory): JavaType = {
val longType = typeFactory.uncheckedSimpleType(classOf[Long])
typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType))
}
}
class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
......
......@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui
import java.util.Properties
import org.json4s.jackson.JsonMethods._
import org.mockito.Mockito.mock
import org.apache.spark._
......@@ -35,10 +36,10 @@ import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanIn
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator}
import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
import testImplicits._
import org.apache.spark.AccumulatorSuite.makeInfo
......@@ -416,6 +417,45 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue)
}
test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") {
val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L)))
val json = JsonProtocol.sparkEventToJson(event)
assertValidDataInJson(json,
parse("""
|{
| "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
| "executionId": 1,
| "accumUpdates": [[2,3]]
|}
""".stripMargin))
JsonProtocol.sparkEventFromJson(json) match {
case SparkListenerDriverAccumUpdates(executionId, accums) =>
assert(executionId == 1L)
accums.foreach { case (a, b) =>
assert(a == 2L)
assert(b == 3L)
}
}
// Test a case where the numbers in the JSON can only fit in longs:
val longJson = parse(
"""
|{
| "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
| "executionId": 4294967294,
| "accumUpdates": [[4294967294,3]]
|}
""".stripMargin)
JsonProtocol.sparkEventFromJson(longJson) match {
case SparkListenerDriverAccumUpdates(executionId, accums) =>
assert(executionId == 4294967294L)
accums.foreach { case (a, b) =>
assert(a == 4294967294L)
assert(b == 3L)
}
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment