From 071bbad5db1096a548c886762b611a8484a52753 Mon Sep 17 00:00:00 2001
From: Damian Guy <damian.guy@gmail.com>
Date: Tue, 11 Aug 2015 12:46:33 +0800
Subject: [PATCH] [SPARK-9340] [SQL] Fixes converting unannotated Parquet lists

This PR is inspired by #8063 authored by dguy. Especially, testing Parquet files added here are all taken from that PR.

**Committer who merges this PR should attribute it to "Damian Guy <damian.guygmail.com>".**

----

SPARK-6776 and SPARK-6777 followed `parquet-avro` to implement backwards-compatibility rules defined in `parquet-format` spec. However, both Spark SQL and `parquet-avro` neglected the following statement in `parquet-format`:

> This does not affect repeated fields that are not annotated: A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor annotated by `LIST` or `MAP` should be interpreted as a required list of required elements where the element type is the type of the field.

One of the consequences is that, Parquet files generated by `parquet-protobuf` containing unannotated repeated fields are not correctly converted to Catalyst arrays.

This PR fixes this issue by

1. Handling unannotated repeated fields in `CatalystSchemaConverter`.
2. Converting this kind of special repeated fields to Catalyst arrays in `CatalystRowConverter`.

   Two special converters, `RepeatedPrimitiveConverter` and `RepeatedGroupConverter`, are added. They delegate actual conversion work to a child `elementConverter` and accumulates elements in an `ArrayBuffer`.

   Two extra methods, `start()` and `end()`, are added to `ParentContainerUpdater`. So that they can be used to initialize new `ArrayBuffer`s for unannotated repeated fields, and propagate converted array values to upstream.

Author: Cheng Lian <lian@databricks.com>

Closes #8070 from liancheng/spark-9340/unannotated-parquet-list and squashes the following commits:

ace6df7 [Cheng Lian] Moves ParquetProtobufCompatibilitySuite
f1c7bfd [Cheng Lian] Updates .rat-excludes
420ad2b [Cheng Lian] Fixes converting unannotated Parquet lists
---
 .rat-excludes                                 |   1 +
 .../parquet/CatalystRowConverter.scala        | 151 ++++++++++++++----
 .../parquet/CatalystSchemaConverter.scala     |   7 +-
 .../resources/nested-array-struct.parquet     | Bin 0 -> 775 bytes
 .../test/resources/old-repeated-int.parquet   | Bin 0 -> 389 bytes
 .../resources/old-repeated-message.parquet    | Bin 0 -> 600 bytes
 .../src/test/resources/old-repeated.parquet   | Bin 0 -> 432 bytes
 .../parquet-thrift-compat.snappy.parquet      | Bin
 .../resources/proto-repeated-string.parquet   | Bin 0 -> 411 bytes
 .../resources/proto-repeated-struct.parquet   | Bin 0 -> 608 bytes
 .../proto-struct-with-array-many.parquet      | Bin 0 -> 802 bytes
 .../resources/proto-struct-with-array.parquet | Bin 0 -> 1576 bytes
 .../ParquetProtobufCompatibilitySuite.scala   |  91 +++++++++++
 .../parquet/ParquetSchemaSuite.scala          |  30 ++++
 14 files changed, 247 insertions(+), 33 deletions(-)
 create mode 100644 sql/core/src/test/resources/nested-array-struct.parquet
 create mode 100644 sql/core/src/test/resources/old-repeated-int.parquet
 create mode 100644 sql/core/src/test/resources/old-repeated-message.parquet
 create mode 100644 sql/core/src/test/resources/old-repeated.parquet
 mode change 100755 => 100644 sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet
 create mode 100644 sql/core/src/test/resources/proto-repeated-string.parquet
 create mode 100644 sql/core/src/test/resources/proto-repeated-struct.parquet
 create mode 100644 sql/core/src/test/resources/proto-struct-with-array-many.parquet
 create mode 100644 sql/core/src/test/resources/proto-struct-with-array.parquet
 create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala

diff --git a/.rat-excludes b/.rat-excludes
index 7277146584..9165872b9f 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -94,3 +94,4 @@ INDEX
 gen-java.*
 .*avpr
 org.apache.spark.sql.sources.DataSourceRegister
+.*parquet
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 3542dfbae1..ab5a6ddd41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger}
 import java.nio.ByteOrder
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
+import org.apache.parquet.schema.OriginalType.LIST
 import org.apache.parquet.schema.Type.Repetition
 import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
 
@@ -42,6 +42,12 @@ import org.apache.spark.unsafe.types.UTF8String
  * values to an [[ArrayBuffer]].
  */
 private[parquet] trait ParentContainerUpdater {
+  /** Called before a record field is being converted */
+  def start(): Unit = ()
+
+  /** Called after a record field is being converted */
+  def end(): Unit = ()
+
   def set(value: Any): Unit = ()
   def setBoolean(value: Boolean): Unit = set(value)
   def setByte(value: Byte): Unit = set(value)
@@ -55,6 +61,32 @@ private[parquet] trait ParentContainerUpdater {
 /** A no-op updater used for root converter (who doesn't have a parent). */
 private[parquet] object NoopUpdater extends ParentContainerUpdater
 
+private[parquet] trait HasParentContainerUpdater {
+  def updater: ParentContainerUpdater
+}
+
+/**
+ * A convenient converter class for Parquet group types with an [[HasParentContainerUpdater]].
+ */
+private[parquet] abstract class CatalystGroupConverter(val updater: ParentContainerUpdater)
+  extends GroupConverter with HasParentContainerUpdater
+
+/**
+ * Parquet converter for Parquet primitive types.  Note that not all Spark SQL atomic types
+ * are handled by this converter.  Parquet primitive types are only a subset of those of Spark
+ * SQL.  For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
+ */
+private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUpdater)
+  extends PrimitiveConverter with HasParentContainerUpdater {
+
+  override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
+  override def addInt(value: Int): Unit = updater.setInt(value)
+  override def addLong(value: Long): Unit = updater.setLong(value)
+  override def addFloat(value: Float): Unit = updater.setFloat(value)
+  override def addDouble(value: Double): Unit = updater.setDouble(value)
+  override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
+}
+
 /**
  * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s.
  * Since any Parquet record is also a struct, this converter can also be used as root converter.
@@ -70,7 +102,7 @@ private[parquet] class CatalystRowConverter(
     parquetType: GroupType,
     catalystType: StructType,
     updater: ParentContainerUpdater)
-  extends GroupConverter {
+  extends CatalystGroupConverter(updater) {
 
   /**
    * Updater used together with field converters within a [[CatalystRowConverter]].  It propagates
@@ -89,13 +121,11 @@ private[parquet] class CatalystRowConverter(
 
   /**
    * Represents the converted row object once an entire Parquet record is converted.
-   *
-   * @todo Uses [[UnsafeRow]] for better performance.
    */
   val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
 
   // Converters for each field.
-  private val fieldConverters: Array[Converter] = {
+  private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
     parquetType.getFields.zip(catalystType).zipWithIndex.map {
       case ((parquetFieldType, catalystField), ordinal) =>
         // Converted field value should be set to the `ordinal`-th cell of `currentRow`
@@ -105,11 +135,19 @@ private[parquet] class CatalystRowConverter(
 
   override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)
 
-  override def end(): Unit = updater.set(currentRow)
+  override def end(): Unit = {
+    var i = 0
+    while (i < currentRow.numFields) {
+      fieldConverters(i).updater.end()
+      i += 1
+    }
+    updater.set(currentRow)
+  }
 
   override def start(): Unit = {
     var i = 0
     while (i < currentRow.numFields) {
+      fieldConverters(i).updater.start()
       currentRow.setNullAt(i)
       i += 1
     }
@@ -122,20 +160,20 @@ private[parquet] class CatalystRowConverter(
   private def newConverter(
       parquetType: Type,
       catalystType: DataType,
-      updater: ParentContainerUpdater): Converter = {
+      updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {
 
     catalystType match {
       case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
         new CatalystPrimitiveConverter(updater)
 
       case ByteType =>
-        new PrimitiveConverter {
+        new CatalystPrimitiveConverter(updater) {
           override def addInt(value: Int): Unit =
             updater.setByte(value.asInstanceOf[ByteType#InternalType])
         }
 
       case ShortType =>
-        new PrimitiveConverter {
+        new CatalystPrimitiveConverter(updater) {
           override def addInt(value: Int): Unit =
             updater.setShort(value.asInstanceOf[ShortType#InternalType])
         }
@@ -148,7 +186,7 @@ private[parquet] class CatalystRowConverter(
 
       case TimestampType =>
         // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
-        new PrimitiveConverter {
+        new CatalystPrimitiveConverter(updater) {
           // Converts nanosecond timestamps stored as INT96
           override def addBinary(value: Binary): Unit = {
             assert(
@@ -164,13 +202,23 @@ private[parquet] class CatalystRowConverter(
         }
 
       case DateType =>
-        new PrimitiveConverter {
+        new CatalystPrimitiveConverter(updater) {
           override def addInt(value: Int): Unit = {
             // DateType is not specialized in `SpecificMutableRow`, have to box it here.
             updater.set(value.asInstanceOf[DateType#InternalType])
           }
         }
 
+      // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
+      // annotated by `LIST` or `MAP` should be interpreted as a required list of required
+      // elements where the element type is the type of the field.
+      case t: ArrayType if parquetType.getOriginalType != LIST =>
+        if (parquetType.isPrimitive) {
+          new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
+        } else {
+          new RepeatedGroupConverter(parquetType, t.elementType, updater)
+        }
+
       case t: ArrayType =>
         new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
 
@@ -195,27 +243,11 @@ private[parquet] class CatalystRowConverter(
     }
   }
 
-  /**
-   * Parquet converter for Parquet primitive types.  Note that not all Spark SQL atomic types
-   * are handled by this converter.  Parquet primitive types are only a subset of those of Spark
-   * SQL.  For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
-   */
-  private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater)
-    extends PrimitiveConverter {
-
-    override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
-    override def addInt(value: Int): Unit = updater.setInt(value)
-    override def addLong(value: Long): Unit = updater.setLong(value)
-    override def addFloat(value: Float): Unit = updater.setFloat(value)
-    override def addDouble(value: Double): Unit = updater.setDouble(value)
-    override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
-  }
-
   /**
    * Parquet converter for strings. A dictionary is used to minimize string decoding cost.
    */
   private final class CatalystStringConverter(updater: ParentContainerUpdater)
-    extends PrimitiveConverter {
+    extends CatalystPrimitiveConverter(updater) {
 
     private var expandedDictionary: Array[UTF8String] = null
 
@@ -242,7 +274,7 @@ private[parquet] class CatalystRowConverter(
   private final class CatalystDecimalConverter(
       decimalType: DecimalType,
       updater: ParentContainerUpdater)
-    extends PrimitiveConverter {
+    extends CatalystPrimitiveConverter(updater) {
 
     // Converts decimals stored as INT32
     override def addInt(value: Int): Unit = {
@@ -306,7 +338,7 @@ private[parquet] class CatalystRowConverter(
       parquetSchema: GroupType,
       catalystSchema: ArrayType,
       updater: ParentContainerUpdater)
-    extends GroupConverter {
+    extends CatalystGroupConverter(updater) {
 
     private var currentArray: ArrayBuffer[Any] = _
 
@@ -383,7 +415,7 @@ private[parquet] class CatalystRowConverter(
       parquetType: GroupType,
       catalystType: MapType,
       updater: ParentContainerUpdater)
-    extends GroupConverter {
+    extends CatalystGroupConverter(updater) {
 
     private var currentKeys: ArrayBuffer[Any] = _
     private var currentValues: ArrayBuffer[Any] = _
@@ -446,4 +478,61 @@ private[parquet] class CatalystRowConverter(
       }
     }
   }
+
+  private trait RepeatedConverter {
+    private var currentArray: ArrayBuffer[Any] = _
+
+    protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater {
+      override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
+      override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))
+      override def set(value: Any): Unit = currentArray += value
+    }
+  }
+
+  /**
+   * A primitive converter for converting unannotated repeated primitive values to required arrays
+   * of required primitives values.
+   */
+  private final class RepeatedPrimitiveConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      parentUpdater: ParentContainerUpdater)
+    extends PrimitiveConverter with RepeatedConverter with HasParentContainerUpdater {
+
+    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
+
+    private val elementConverter: PrimitiveConverter =
+      newConverter(parquetType, catalystType, updater).asPrimitiveConverter()
+
+    override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value)
+    override def addInt(value: Int): Unit = elementConverter.addInt(value)
+    override def addLong(value: Long): Unit = elementConverter.addLong(value)
+    override def addFloat(value: Float): Unit = elementConverter.addFloat(value)
+    override def addDouble(value: Double): Unit = elementConverter.addDouble(value)
+    override def addBinary(value: Binary): Unit = elementConverter.addBinary(value)
+
+    override def setDictionary(dict: Dictionary): Unit = elementConverter.setDictionary(dict)
+    override def hasDictionarySupport: Boolean = elementConverter.hasDictionarySupport
+    override def addValueFromDictionary(id: Int): Unit = elementConverter.addValueFromDictionary(id)
+  }
+
+  /**
+   * A group converter for converting unannotated repeated group values to required arrays of
+   * required struct values.
+   */
+  private final class RepeatedGroupConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      parentUpdater: ParentContainerUpdater)
+    extends GroupConverter with HasParentContainerUpdater with RepeatedConverter {
+
+    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
+
+    private val elementConverter: GroupConverter =
+      newConverter(parquetType, catalystType, updater).asGroupConverter()
+
+    override def getConverter(field: Int): Converter = elementConverter.getConverter(field)
+    override def end(): Unit = elementConverter.end()
+    override def start(): Unit = elementConverter.start()
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index a3fc74cf79..275646e818 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -100,8 +100,11 @@ private[parquet] class CatalystSchemaConverter(
           StructField(field.getName, convertField(field), nullable = false)
 
         case REPEATED =>
-          throw new AnalysisException(
-            s"REPEATED not supported outside LIST or MAP. Type: $field")
+          // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
+          // annotated by `LIST` or `MAP` should be interpreted as a required list of required
+          // elements where the element type is the type of the field.
+          val arrayType = ArrayType(convertField(field), containsNull = false)
+          StructField(field.getName, arrayType, nullable = false)
       }
     }
 
diff --git a/sql/core/src/test/resources/nested-array-struct.parquet b/sql/core/src/test/resources/nested-array-struct.parquet
new file mode 100644
index 0000000000000000000000000000000000000000..41a43fa35d39685e56ba4849a16cba4bb1aa86ae
GIT binary patch
literal 775
zcmaKr-%G+!6vvO#=8u;i;*JSE3{j~lAyWtuV%0Q3*U(Y)B(q&>u(@?NBZ;2+Px=dc
z?6I>s%N6u+cQ4=bIp1@3cBjdsBLbvCDhGte15a`Q8~~)V;d2WY3V@LYX{-@GMj#!6
z`;fvdgDblto20oxMhs#hdKzt*4*3w}iuUE6PW?b*Zs1NAv%1WfvAnT@2NhLn_L#fy
z<hdEiB!d}MPRx7Ea;(65qR`5Sd1ra<I-X6R*Ky7=!@xG99yQQpNPXY9r-WOj^S<lQ
zz6k<fEEPMoTnXD~<#;G=7!!;NU7=gUcdw}-QAJ?)rxw<T7+uh7Dl+e>HFWX={q7*H
z93@^0*O&w#e53@lJ`hFEV2=wL)V**Pb(8vc%<=-4iJz&t;n22J{%<(t!px$!DZLaV
zDaOCYR1UR;Go`F89pTwFrqpgr1NlrDOs+J&f2GO;)PtpmW%OH3ne<itxKVihYx+Iy
zP1DA2(~+VnvaVbtO^3>OEccXHoWyO`6Bl5({+ea14&qL7DtETw`(h_426$BxCYApN
S1!5siKXe$p;U(Ab7x)6M)yB5~

literal 0
HcmV?d00001

diff --git a/sql/core/src/test/resources/old-repeated-int.parquet b/sql/core/src/test/resources/old-repeated-int.parquet
new file mode 100644
index 0000000000000000000000000000000000000000..520922f73ebb75950c4e65dec6689af817cb7a33
GIT binary patch
literal 389
zcmZWmO-sW-5FMkeB_3tN1_Ca@_7p=~Z@EPbSg5juTs)Ocvz0>9#NEw7ivQhdDcI1<
z%;U}1<Bdr383j6}(})rTgc1M;0Ita~-d-U-BFxnBqqm1b87p*a+p^!px$2a(s#@&q
zyD~<8De7R~_=er-{}OnPANav!6yS<65Pwjm;1MtP^2rDQvjcv_SSR&&uWZ~MF2$8H
zYrXAktC`-(MkPGFyd1ilt*R@&cR4>booNUUY~PehCwzvumZho_zD!@T<tooamKND7
z^7g00^~I;iR*R3~dG7wY!0v}~*0nT-&4;8#U*dCf9xo*Vb5BE0jRt|{3VgsH+umyV

literal 0
HcmV?d00001

diff --git a/sql/core/src/test/resources/old-repeated-message.parquet b/sql/core/src/test/resources/old-repeated-message.parquet
new file mode 100644
index 0000000000000000000000000000000000000000..548db991627774512b856b33b045327678b5a7ab
GIT binary patch
literal 600
zcmZ{i&1%9x5XZ+@gQVWdrW^1OLK8}{0S^TUy`(h#0EM8rR0K(Mt$~=t%}Oby@8A=8
z>VtKoQHivd-PzgS{O4oWwfk)ZsDnB!ByvMUB7gt@Rk50{GMw}6DWn-w!#F0CGZwP`
zh81XVct9peJU!6=O6yx`ZywS;EGVOwOOIsCr3p)d)y(vgv`4bce<w(QyE9zd)gS_D
z=mM@LG0~y0*+;UsYiv$Qej^J?Q@J<lP1I0y5S8m)d>)5D7UiKlH0l6Ga0+m-EijTt
zM!<nh<%HO!IEZX2JlR^11yEWTr#@Y-2s6{X4RcDe2@O-0P`s|h{(@MlcI-=&W+92D
z>X)Rd#pSj~EkCao0il-K=62)<B(l`5{or)ReaCfd&w1p2_L*`)@6C0_J!dempPj-v
yF4WA@KwtAWEJmtH-xA=9a<!Al&oX>db~64ZC7r8d1AwIhzFkoG;e&AbpZW#pJ&{5H

literal 0
HcmV?d00001

diff --git a/sql/core/src/test/resources/old-repeated.parquet b/sql/core/src/test/resources/old-repeated.parquet
new file mode 100644
index 0000000000000000000000000000000000000000..213f1a90291b30a8a3161b51c38f008f3ae9f6e5
GIT binary patch
literal 432
zcmZWm!D@p*5ZxNF!5+)X3PMGioUA16Ei?y9g$B|h;-#ms>Ld--Xm{5`DgF13A<&42
znSH!BJNtMWhsm50I-@h68VC$(I7}ZALYRJm-NGUo*2p;a%Z@xEJgH{;FE=Sj6^mNc
zS-TAqXn-pyRtNP8Qt};84d*60yAuBru{7JUo$1)Y6%&KlJ(Uv6u!JS1<Hvy&<TYaZ
zj5S<*wV;BgdpRgT=3?KdS}<|o6)aqD=)^O(dngO{$8_D((3vN%W7{eNv2f>zOP)cw
zaM$5ewB9699EEB0jJ*18aDDn7N1N4K`fzXlnuJ~V?c^nwk}Yeo3wXox4+#3Y!pMU2
V+-`?%2{TWZ?kYh(G4~k%>JK8=aDe~-

literal 0
HcmV?d00001

diff --git a/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet b/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet
old mode 100755
new mode 100644
diff --git a/sql/core/src/test/resources/proto-repeated-string.parquet b/sql/core/src/test/resources/proto-repeated-string.parquet
new file mode 100644
index 0000000000000000000000000000000000000000..8a7eea601d0164b1177ab90efb7116c6e4c900da
GIT binary patch
literal 411
zcmY*W!D_-l5S^$E62wc{umKMtR4+{f-b!vM4Q)Y6&|G?w#H^aKansF;gi?C#*Yq1Z
zv6e;{_C4Mk<_)t^FrN}2UmBK6hDddy19SkO`+9soFOY8;=b|A8A$itAvJoQdBBnKK
zK<tj74#zIFm5h;&<v!AwRVtD~iFCjA7E0#3O(11blMax<xJUa%{i}`vOx+q;qe*{6
z2G&>y>)#|`4$W^3YtqL)WN5pTmWh1ZGv$>{f|s#sCG%1VN%LJ&FyD4siH@<(8PDu@
z!?sWEU$)ao`yyr1x2MQ?k}~ewv*0eAE$3kr261?gx~fYY8oxy0auLs;o*#@41L)=X
h7Au}q6}>(e6`sLs-{PvZ8BpWYeN#xd)c_*=mLHLcZu|fM

literal 0
HcmV?d00001

diff --git a/sql/core/src/test/resources/proto-repeated-struct.parquet b/sql/core/src/test/resources/proto-repeated-struct.parquet
new file mode 100644
index 0000000000000000000000000000000000000000..c29eee35c350e84efa2818febb2528309a8ac3ea
GIT binary patch
literal 608
zcma))-D<)x6vvP811a8(bSZdI%9Js*tjca=2puciK%r=F1_P}cr%>B2jf^q&4ts<>
z%r1PaC9^8^%A4e$li&GFTzg<)z+K#J;DQh(TmnD<kVkpBDo_;LwcgY%DJ&^Y<6Aqv
zbsFE^@okK>m&bFDCfsEak0$H6=|yp$CW-$_F@l={DK5j1GEpn8)DX!>A+15G`Fpg}
zMZREE-l#~cYPa=r6<4%c3AD?tzx2bP7Sypiu9pGoi(^0p+XD*$Y;s4$HpQOV<V3O`
zq$1E7gv363TT^bvvJG=alPO!qJYwWK7y2vO(Tzi2hB9B$V5Wa6x}IMC_h`uTdNrY}
zqgz4;{)7e{y=UJ!z0tsN9ouv6g`I!IBB1x|I-|Zb9NG^~Y8<7}%*;Ta<owc+8p@Xf
k<Sbk5H1LyzTxE@1yh$d`#i0QpN(whu41UERT=BR50I{KoDgXcg

literal 0
HcmV?d00001

diff --git a/sql/core/src/test/resources/proto-struct-with-array-many.parquet b/sql/core/src/test/resources/proto-struct-with-array-many.parquet
new file mode 100644
index 0000000000000000000000000000000000000000..ff9809675fc04edbbfc43f4033593a5aa6141dfa
GIT binary patch
literal 802
zcmb7DK}y3w6rChC)R3J_oIn>L*dnD3MsZWzL<}ml5ZY`6p``6p3uzOR6cO<bE<A+?
zaNPrV1eZOCGt(qZx+_eQ|NeXPU;ZD)vHB(i@P{L-1!WS7Py~Pn00>QizQ3hI@;TGm
z8hy+Rm>Hl!&aiC8oEI4i7`u<#dC`r5%q<5r!9eDg1IFy*c2RU=AalzBO)!wT<$y7e
zS0C?=T^uJ)6ePiDIW^oM?BO`}o-pLW<pHY977HAa8AG|wBOzGeI!NQ>HOS&h@*H8x
zD56?ZuNu`Fl-0Tj)YHv=x(@<HR{l7$8dU%qYi?IV>J>C=j)+#mj%Z_4kgdp}D_<3b
zc(o7;z363$6C<SU7!w?iqgj{)5&Gm7-W+!{ecPWdO=sAjXz!Ayr6lWBr+7IZ;i#jT
zB=)@#_B&eFy0*K+zU|nSYu}K3@)nYU?t^0wd-h;p-P&n#m{v1NgT1G7Oh>Cr9}+-E
j<-Z;KUL2!lIhl~NDb+dIHUN;6ire!D{E~S&<aYc6_=2b7

literal 0
HcmV?d00001

diff --git a/sql/core/src/test/resources/proto-struct-with-array.parquet b/sql/core/src/test/resources/proto-struct-with-array.parquet
new file mode 100644
index 0000000000000000000000000000000000000000..325a8370ad20ec31010fde0c816895da9f5dbd27
GIT binary patch
literal 1576
zcmcIlO>5gg5S?rsql%Icnq5|qgD{N<#x?oCY2!n{ZAEKv64iDOJsH{F!~)4uB-v0(
z@BJM;_owufA5^-l4@Z{ekVAVh>zOxi-n`LDMyq>_0q^0x8b<t3#u5MmfXxk=|MG~0
zr8vb80MFte#Wcm26MS4HDz@=<n&cImf-GVxYLf6(akYj^QU8<=MTK(IF_MCmUmvwT
z3qr$pLw7j4srN<`=9ur?>74l?^SjJrp%q&06h8-y4iMdSJ@MDH4c~HjX3j($=&sN1
zW|q&!OYxG3d&~^8@dlzhDa$1b0`rz(6tkBD*J153G=T1;gzF$B0g1WSKnPOy6<d?`
zF;W^(mXreZ7ziR~<V@qfY8L`|00NkHUyKt0tE8%3aNr~cmLp(pz6jVPO>M$~KQ|W5
z5A#DO!$$Zca>TK`;67WBib&?m76{4rqTmNgwH)UCc)*uPhjcg;fc!=Tfl{N?GyS_6
z3+tZPeSOS=k#BjS>(f75Q`2EhwX*hMsK_@Kv&ZT;SydBky3mDR6_J}cL*_TtV}7>H
zA+wumr}b9v46coS`}(TY;qmaR$9wg^82X@n)jvIvzps*~J`|Fl<jK4Gr_b%K{{G42
zxdmpGpRSS{daM6r^3;}OgIVg=dFo<QF7;lo^+8XD86<YsAG@_Z%kgZsX!(et3(Ui=
z-sEK!+xeelE#2p=Tj#5?iWc!NWC4A#d^bk683govIT-TS=F%2kuDvcm<)rqN<HLLl
Iz~JAAzx9@|oB#j-

literal 0
HcmV?d00001

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
new file mode 100644
index 0000000000..981334cf77
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+
+class ParquetProtobufCompatibilitySuite extends ParquetCompatibilityTest {
+  override def sqlContext: SQLContext = TestSQLContext
+
+  private def readParquetProtobufFile(name: String): DataFrame = {
+    val url = Thread.currentThread().getContextClassLoader.getResource(name)
+    sqlContext.read.parquet(url.toString)
+  }
+
+  test("unannotated array of primitive type") {
+    checkAnswer(readParquetProtobufFile("old-repeated-int.parquet"), Row(Seq(1, 2, 3)))
+  }
+
+  test("unannotated array of struct") {
+    checkAnswer(
+      readParquetProtobufFile("old-repeated-message.parquet"),
+      Row(
+        Seq(
+          Row("First inner", null, null),
+          Row(null, "Second inner", null),
+          Row(null, null, "Third inner"))))
+
+    checkAnswer(
+      readParquetProtobufFile("proto-repeated-struct.parquet"),
+      Row(
+        Seq(
+          Row("0 - 1", "0 - 2", "0 - 3"),
+          Row("1 - 1", "1 - 2", "1 - 3"))))
+
+    checkAnswer(
+      readParquetProtobufFile("proto-struct-with-array-many.parquet"),
+      Seq(
+        Row(
+          Seq(
+            Row("0 - 0 - 1", "0 - 0 - 2", "0 - 0 - 3"),
+            Row("0 - 1 - 1", "0 - 1 - 2", "0 - 1 - 3"))),
+        Row(
+          Seq(
+            Row("1 - 0 - 1", "1 - 0 - 2", "1 - 0 - 3"),
+            Row("1 - 1 - 1", "1 - 1 - 2", "1 - 1 - 3"))),
+        Row(
+          Seq(
+            Row("2 - 0 - 1", "2 - 0 - 2", "2 - 0 - 3"),
+            Row("2 - 1 - 1", "2 - 1 - 2", "2 - 1 - 3")))))
+  }
+
+  test("struct with unannotated array") {
+    checkAnswer(
+      readParquetProtobufFile("proto-struct-with-array.parquet"),
+      Row(10, 9, Seq.empty, null, Row(9), Seq(Row(9), Row(10))))
+  }
+
+  test("unannotated array of struct with unannotated array") {
+    checkAnswer(
+      readParquetProtobufFile("nested-array-struct.parquet"),
+      Seq(
+        Row(2, Seq(Row(1, Seq(Row(3))))),
+        Row(5, Seq(Row(4, Seq(Row(6))))),
+        Row(8, Seq(Row(7, Seq(Row(9)))))))
+  }
+
+  test("unannotated array of string") {
+    checkAnswer(
+      readParquetProtobufFile("proto-repeated-string.parquet"),
+      Seq(
+        Row(Seq("hello", "world")),
+        Row(Seq("good", "bye")),
+        Row(Seq("one", "two", "three"))))
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8f06de7ce7..971f71e27b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -585,6 +585,36 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin)
 
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with non-nullable element type 7 - " +
+      "parquet-protobuf primitive lists",
+    new StructType()
+      .add("f1", ArrayType(IntegerType, containsNull = false), nullable = false),
+    """message root {
+      |  repeated int32 f1;
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with non-nullable element type 8 - " +
+      "parquet-protobuf non-primitive lists",
+    {
+      val elementType =
+        new StructType()
+          .add("c1", StringType, nullable = true)
+          .add("c2", IntegerType, nullable = false)
+
+      new StructType()
+        .add("f1", ArrayType(elementType, containsNull = false), nullable = false)
+    },
+    """message root {
+      |  repeated group f1 {
+      |    optional binary c1 (UTF8);
+      |    required int32 c2;
+      |  }
+      |}
+    """.stripMargin)
+
   // =======================================================
   // Tests for converting Catalyst ArrayType to Parquet LIST
   // =======================================================
-- 
GitLab