diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 9def4559d214edd889fde9fa8e2f9f3e15fafb30..98018b7f48bd8c4ebf563d83adfb3e4488f928c0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -170,7 +170,7 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori @Override public final Binary readBinary(int len) { - Binary result = Binary.fromByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len); + Binary result = Binary.fromConstantByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len); offset += len; return result; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index cf974afb2664b50568d802a6f155d43c24cb8ebb..00e1bcaf6327d564a23936fb8065433869c5b860 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -150,7 +150,8 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi case StringType => (row: SpecializedGetters, ordinal: Int) => - recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes)) + recordConsumer.addBinary( + Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes)) case TimestampType => (row: SpecializedGetters, ordinal: Int) => { @@ -165,12 +166,12 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) val buf = ByteBuffer.wrap(timestampBuffer) buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) - recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer)) + recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) } case BinaryType => (row: SpecializedGetters, ordinal: Int) => - recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal))) + recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal))) case DecimalType.Fixed(precision, scale) => makeDecimalWriter(precision, scale) @@ -227,7 +228,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi shift -= 8 } - recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, numBytes)) + recordConsumer.addBinary(Binary.fromReusedByteArray(decimalBuffer, 0, numBytes)) } val binaryWriterUsingUnscaledBytes = @@ -248,7 +249,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi decimalBuffer } - recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, numBytes)) + recordConsumer.addBinary(Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes)) } writeLegacyParquetFormat match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 624081250113ade3508a493d9cb6f1e06b002679..7213a38b08838a023de49036a99bd5011f3d4078 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -50,7 +50,6 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - // Binary.fromString and Binary.fromByteArray don't accept null values case StringType => (n: String, v: Any) => FilterApi.eq( @@ -73,7 +72,6 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), @@ -93,7 +91,6 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), @@ -112,7 +109,6 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), @@ -131,8 +127,6 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - - // See https://issues.apache.org/jira/browse/SPARK-11153 case StringType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), @@ -151,7 +145,6 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), @@ -174,7 +167,6 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Set[Any]) => FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]])) - case StringType => (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 45fd6a5d80dea32fc11ca77cb56bbdf9c381ee45..2a5666e70f767524e988f28400a8e67a89cca2c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -229,8 +229,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - // See https://issues.apache.org/jira/browse/SPARK-11153 - ignore("filter pushdown - string") { + test("filter pushdown - string") { withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate( @@ -258,8 +257,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - // See https://issues.apache.org/jira/browse/SPARK-11153 - ignore("filter pushdown - binary") { + test("filter pushdown - binary") { implicit class IntToBinary(int: Int) { def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8) }