diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index c988f1d1b972e9df969609c0d1d77273e38a6290..bfcf111385b7eceaf8f5bbd41b4a84c53addbfb4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -145,7 +145,13 @@ final class Decimal extends Ordered[Decimal] with Serializable { } } - def toJavaBigDecimal: java.math.BigDecimal = toBigDecimal.underlying() + def toJavaBigDecimal: java.math.BigDecimal = { + if (decimalVal.ne(null)) { + decimalVal.underlying() + } else { + java.math.BigDecimal.valueOf(longVal, _scale) + } + } def toUnscaledLong: Long = { if (decimalVal.ne(null)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index 2ff2fda3610b8857a5de9a02ffe23e6163f5b478..050d3610a6413e68d7d363dfd98721b8391e72b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -302,7 +302,13 @@ private[parquet] class CatalystRowConverter( } override def addBinary(value: Binary): Unit = { - updater.set(UTF8String.fromBytes(value.getBytes)) + // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we + // are using `Binary.toByteBuffer.array()` to steal the underlying byte array without copying + // it. + val buffer = value.toByteBuffer + val offset = buffer.position() + val numBytes = buffer.limit() - buffer.position() + updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes)) } } @@ -332,24 +338,30 @@ private[parquet] class CatalystRowConverter( private def toDecimal(value: Binary): Decimal = { val precision = decimalType.precision val scale = decimalType.scale - val bytes = value.getBytes if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) { - // Constructs a `Decimal` with an unscaled `Long` value if possible. + // Constructs a `Decimal` with an unscaled `Long` value if possible. The underlying + // `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using + // `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it. + val buffer = value.toByteBuffer + val bytes = buffer.array() + val start = buffer.position() + val end = buffer.limit() + var unscaled = 0L - var i = 0 + var i = start - while (i < bytes.length) { + while (i < end) { unscaled = (unscaled << 8) | (bytes(i) & 0xff) i += 1 } - val bits = 8 * bytes.length + val bits = 8 * (end - start) unscaled = (unscaled << (64 - bits)) >> (64 - bits) Decimal(unscaled, precision, scale) } else { // Otherwise, resorts to an unscaled `BigInteger` instead. - Decimal(new BigDecimal(new BigInteger(bytes), scale), precision, scale) + Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 2d237da81c20d235d6092b5283acc3291b012d99..97ffeb08aafe2cca69cbe14b03061accef70d75b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -567,9 +567,9 @@ private[parquet] object CatalystSchemaConverter { } } - val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) + val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */ - val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8) + val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8) /* 18 */ // Max precision of a decimal value stored in `numBytes` bytes def maxPrecisionForBytes(numBytes: Int): Int = {