Skip to content
Snippets Groups Projects
Commit 12bf8324 authored by Imran Rashid's avatar Imran Rashid
Browse files

[SPARK-19796][CORE] Fix serialization of long property values in TaskDescription

## What changes were proposed in this pull request?

The properties that are serialized with a TaskDescription can have very long values (eg. "spark.job.description" which is set to the full sql statement with the thrift-server).  DataOutputStream.writeUTF() does not work well for long strings, so this changes the way those values are serialized to handle longer strings.

## How was this patch tested?

Updated existing unit test to reproduce the issue.  All unit tests via jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes #17140 from squito/SPARK-19796.
parent 096df6d9
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.Properties
import scala.collection.JavaConverters._
......@@ -86,7 +87,10 @@ private[spark] object TaskDescription {
dataOut.writeInt(taskDescription.properties.size())
taskDescription.properties.asScala.foreach { case (key, value) =>
dataOut.writeUTF(key)
dataOut.writeUTF(value)
// SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values
val bytes = value.getBytes(StandardCharsets.UTF_8)
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
// Write the task. The task is already serialized, so write it directly to the byte buffer.
......@@ -124,7 +128,11 @@ private[spark] object TaskDescription {
val properties = new Properties()
val numProperties = dataIn.readInt()
for (i <- 0 until numProperties) {
properties.setProperty(dataIn.readUTF(), dataIn.readUTF())
val key = dataIn.readUTF()
val valueLength = dataIn.readInt()
val valueBytes = new Array[Byte](valueLength)
dataIn.readFully(valueBytes)
properties.setProperty(key, new String(valueBytes, StandardCharsets.UTF_8))
}
// Create a sub-buffer for the serialized task into its own buffer (to be deserialized later).
......
......@@ -17,6 +17,7 @@
package org.apache.spark.scheduler
import java.io.{ByteArrayOutputStream, DataOutputStream, UTFDataFormatException}
import java.nio.ByteBuffer
import java.util.Properties
......@@ -36,6 +37,21 @@ class TaskDescriptionSuite extends SparkFunSuite {
val originalProperties = new Properties()
originalProperties.put("property1", "18")
originalProperties.put("property2", "test value")
// SPARK-19796 -- large property values (like a large job description for a long sql query)
// can cause problems for DataOutputStream, make sure we handle correctly
val sb = new StringBuilder()
(0 to 10000).foreach(_ => sb.append("1234567890"))
val largeString = sb.toString()
originalProperties.put("property3", largeString)
// make sure we've got a good test case
intercept[UTFDataFormatException] {
val out = new DataOutputStream(new ByteArrayOutputStream())
try {
out.writeUTF(largeString)
} finally {
out.close()
}
}
// Create a dummy byte buffer for the task.
val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment