Skip to content
Snippets Groups Projects
Commit e57d6b56 authored by Wenchen Fan's avatar Wenchen Fan Committed by Davies Liu
Browse files

[SPARK-9683] [SQL] copy UTF8String when convert unsafe array/map to safe

When we convert unsafe row to safe row, we will do copy if the column is struct or string type. However, the string inside unsafe array/map are not copied, which may cause problems.

Author: Wenchen Fan <cloud0fan@outlook.com>

Closes #7990 from cloud-fan/copy and squashes the following commits:

c13d1e3 [Wenchen Fan] change test name
fe36294 [Wenchen Fan] we should deep copy UTF8String when convert unsafe row to safe row
parent 15bd6f33
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions ...@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
case class FromUnsafe(child: Expression) extends UnaryExpression case class FromUnsafe(child: Expression) extends UnaryExpression
with ExpectsInputTypes with CodegenFallback { with ExpectsInputTypes with CodegenFallback {
...@@ -52,6 +53,8 @@ case class FromUnsafe(child: Expression) extends UnaryExpression ...@@ -52,6 +53,8 @@ case class FromUnsafe(child: Expression) extends UnaryExpression
} }
new GenericArrayData(result) new GenericArrayData(result)
case StringType => value.asInstanceOf[UTF8String].clone()
case MapType(kt, vt, _) => case MapType(kt, vt, _) =>
val map = value.asInstanceOf[UnsafeMapData] val map = value.asInstanceOf[UnsafeMapData]
val safeKeyArray = convert(map.keys, ArrayType(kt)).asInstanceOf[GenericArrayData] val safeKeyArray = convert(map.keys, ArrayType(kt)).asInstanceOf[GenericArrayData]
......
...@@ -17,9 +17,13 @@ ...@@ -17,9 +17,13 @@
package org.apache.spark.sql.execution package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Literal, IsNull} import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, Literal, IsNull}
import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.types.{GenericArrayData, ArrayType, StructType, StringType}
import org.apache.spark.unsafe.types.UTF8String
class RowFormatConvertersSuite extends SparkPlanTest { class RowFormatConvertersSuite extends SparkPlanTest {
...@@ -87,4 +91,36 @@ class RowFormatConvertersSuite extends SparkPlanTest { ...@@ -87,4 +91,36 @@ class RowFormatConvertersSuite extends SparkPlanTest {
input.map(Row.fromTuple) input.map(Row.fromTuple)
) )
} }
test("SPARK-9683: copy UTF8String when convert unsafe array/map to safe") {
SparkPlan.currentContext.set(TestSQLContext)
val schema = ArrayType(StringType)
val rows = (1 to 100).map { i =>
InternalRow(new GenericArrayData(Array[Any](UTF8String.fromString(i.toString))))
}
val relation = LocalTableScan(Seq(AttributeReference("t", schema)()), rows)
val plan =
DummyPlan(
ConvertToSafe(
ConvertToUnsafe(relation)))
assert(plan.execute().collect().map(_.getUTF8String(0).toString) === (1 to 100).map(_.toString))
}
}
case class DummyPlan(child: SparkPlan) extends UnaryNode {
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter =>
// cache all strings to make sure we have deep copied UTF8String inside incoming
// safe InternalRow.
val strings = new scala.collection.mutable.ArrayBuffer[UTF8String]
iter.foreach { row =>
strings += row.getArray(0).getUTF8String(0)
}
strings.map(InternalRow(_)).iterator
}
}
override def output: Seq[Attribute] = Seq(AttributeReference("a", StringType)())
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment