diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index d2c2db51769ba69a4295d5960153be05c8d62eef..cbf0704c4a9a4bf092566cae431ff616c456749c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -415,8 +415,9 @@ private[parquet] class CatalystRowConverter( private val elementConverter: Converter = { val repeatedType = parquetSchema.getType(0) val elementType = catalystSchema.elementType + val parentName = parquetSchema.getName - if (isElementType(repeatedType, elementType)) { + if (isElementType(repeatedType, elementType, parentName)) { newConverter(repeatedType, elementType, new ParentContainerUpdater { override def set(value: Any): Unit = currentArray += value }) @@ -453,10 +454,13 @@ private[parquet] class CatalystRowConverter( * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules */ // scalastyle:on - private def isElementType(parquetRepeatedType: Type, catalystElementType: DataType): Boolean = { + private def isElementType( + parquetRepeatedType: Type, catalystElementType: DataType, parentName: String): Boolean = { (parquetRepeatedType, catalystElementType) match { case (t: PrimitiveType, _) => true case (t: GroupType, _) if t.getFieldCount > 1 => true + case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => true + case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parentName + "_tuple" => true case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true case _ => false } @@ -474,15 +478,9 @@ private[parquet] class CatalystRowConverter( override def getConverter(fieldIndex: Int): Converter = converter - override def end(): Unit = { - converter.updater.end() - currentArray += currentElement - } + override def end(): Unit = currentArray += currentElement - override def start(): Unit = { - converter.updater.start() - currentElement = null - } + override def start(): Unit = currentElement = null } }