Skip to content
Snippets Groups Projects
Commit f5abd271 authored by Takuya UESHIN's avatar Takuya UESHIN Committed by Michael Armbrust
Browse files

[SPARK-2415] [SQL] RowWriteSupport should handle empty ArrayType correctly.

`RowWriteSupport` doesn't write empty `ArrayType` value, so the read value becomes `null`.
It should write empty `ArrayType` value as it is.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1339 from ueshin/issues/SPARK-2415 and squashes the following commits:

32afc87 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-2415
2f05196 [Takuya UESHIN] Fix RowWriteSupport to handle empty ArrayType correctly.
parent f62c4272
No related branches found
No related tags found
No related merge requests found
......@@ -229,9 +229,9 @@ private[parquet] class CatalystGroupConverter(
this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)
protected [parquet] val converters: Array[Converter] =
schema.map(field =>
CatalystConverter.createConverter(field, schema.indexOf(field), this))
.toArray
schema.zipWithIndex.map {
case (field, idx) => CatalystConverter.createConverter(field, idx, this)
}.toArray
override val size = schema.size
......@@ -288,9 +288,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
new ParquetRelation.RowType(attributes.length))
protected [parquet] val converters: Array[Converter] =
schema.map(field =>
CatalystConverter.createConverter(field, schema.indexOf(field), this))
.toArray
schema.zipWithIndex.map {
case (field, idx) => CatalystConverter.createConverter(field, idx, this)
}.toArray
override val size = schema.size
......
......@@ -156,7 +156,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
writer.startMessage()
while(index < attributes.size) {
// null values indicate optional fields but we do not check currently
if (record(index) != null && record(index) != Nil) {
if (record(index) != null) {
writer.startField(attributes(index).name, index)
writeValue(attributes(index).dataType, record(index))
writer.endField(attributes(index).name, index)
......@@ -167,7 +167,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
}
private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
if (value != null && value != Nil) {
if (value != null) {
schema match {
case t @ ArrayType(_) => writeArray(
t,
......@@ -184,7 +184,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
}
private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
if (value != null && value != Nil) {
if (value != null) {
schema match {
case StringType => writer.addBinary(
Binary.fromByteArray(
......@@ -206,12 +206,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
private[parquet] def writeStruct(
schema: StructType,
struct: CatalystConverter.StructScalaType[_]): Unit = {
if (struct != null && struct != Nil) {
if (struct != null) {
val fields = schema.fields.toArray
writer.startGroup()
var i = 0
while(i < fields.size) {
if (struct(i) != null && struct(i) != Nil) {
if (struct(i) != null) {
writer.startField(fields(i).name, i)
writeValue(fields(i).dataType, struct(i))
writer.endField(fields(i).name, i)
......
......@@ -78,7 +78,7 @@ case class AllDataTypesWithNonPrimitiveType(
booleanField: Boolean,
array: Seq[Int],
map: Map[Int, String],
nested: Nested)
data: Data)
class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
TestData // Load test data tables.
......@@ -138,7 +138,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestSQLContext.sparkContext.parallelize(range)
.map(x => AllDataTypesWithNonPrimitiveType(
s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
Seq(x), Map(x -> s"$x"), Nested(x, s"$x")))
(0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x"))))
.saveAsParquetFile(tempDir)
val result = parquetFile(tempDir).collect()
range.foreach {
......@@ -151,9 +151,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(result(i).getShort(5) === i.toShort)
assert(result(i).getByte(6) === i.toByte)
assert(result(i).getBoolean(7) === (i % 2 == 0))
assert(result(i)(8) === Seq(i))
assert(result(i)(9) === Map(i -> s"$i"))
assert(result(i)(10) === new GenericRow(Array[Any](i, s"$i")))
assert(result(i)(8) === (0 until i))
assert(result(i)(9) === (0 until i).map(i => i -> s"$i").toMap)
assert(result(i)(10) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment