From 2272962eb087ffedaee12c761506e33e45bd0239 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO <linguin.m.s@gmail.com> Date: Thu, 1 Oct 2015 21:33:27 -0400 Subject: [PATCH] [SPARK-9867] [SQL] Move utilities for binary data into ByteArray The utilities such as Substring#substringBinarySQL and BinaryPrefixComparator#computePrefix for binary data are put together in ByteArray for easy-to-read. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #8122 from maropu/CleanUpForBinaryType. --- .../unsafe/sort/PrefixComparators.java | 17 +------ .../expressions/stringExpressions.scala | 39 ++------------- .../apache/spark/unsafe/types/ByteArray.java | 47 ++++++++++++++++++- 3 files changed, 52 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java index 71b76d5ddf..d2bf297c6c 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java @@ -21,6 +21,7 @@ import com.google.common.primitives.UnsignedLongs; import org.apache.spark.annotation.Private; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.types.ByteArray; import org.apache.spark.unsafe.types.UTF8String; import org.apache.spark.util.Utils; @@ -62,21 +63,7 @@ public class PrefixComparators { } public static long computePrefix(byte[] bytes) { - if (bytes == null) { - return 0L; - } else { - /** - * TODO: If a wrapper for BinaryType is created (SPARK-8786), - * these codes below will be in the wrapper class. - */ - final int minLen = Math.min(bytes.length, 8); - long p = 0; - for (int i = 0; i < minLen; ++i) { - p |= (128L + Platform.getByte(bytes, Platform.BYTE_ARRAY_OFFSET + i)) - << (56 - 8 * i); - } - return p; - } + return ByteArray.getPrefix(bytes); } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index a09d5b6e3a..4ab27c044f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -18,14 +18,12 @@ package org.apache.spark.sql.catalyst.expressions import java.text.DecimalFormat -import java.util.Arrays -import java.util.{Map => JMap, HashMap} -import java.util.Locale +import java.util.{HashMap, Locale, Map => JMap} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{ByteArray, UTF8String} //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines expressions for string operations. @@ -690,34 +688,6 @@ case class StringSpace(child: Expression) override def prettyName: String = "space" } -object Substring { - def subStringBinarySQL(bytes: Array[Byte], pos: Int, len: Int): Array[Byte] = { - if (pos > bytes.length) { - return Array[Byte]() - } - - var start = if (pos > 0) { - pos - 1 - } else if (pos < 0) { - bytes.length + pos - } else { - 0 - } - - val end = if ((bytes.length - start) < len) { - bytes.length - } else { - start + len - } - - start = Math.max(start, 0) // underflow - if (start < end) { - Arrays.copyOfRange(bytes, start, end) - } else { - Array[Byte]() - } - } -} /** * A function that takes a substring of its first argument starting at a given position. * Defined for String and Binary types. @@ -740,18 +710,17 @@ case class Substring(str: Expression, pos: Expression, len: Expression) str.dataType match { case StringType => string.asInstanceOf[UTF8String] .substringSQL(pos.asInstanceOf[Int], len.asInstanceOf[Int]) - case BinaryType => Substring.subStringBinarySQL(string.asInstanceOf[Array[Byte]], + case BinaryType => ByteArray.subStringSQL(string.asInstanceOf[Array[Byte]], pos.asInstanceOf[Int], len.asInstanceOf[Int]) } } override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { - val cls = classOf[Substring].getName defineCodeGen(ctx, ev, (string, pos, len) => { str.dataType match { case StringType => s"$string.substringSQL($pos, $len)" - case BinaryType => s"$cls.subStringBinarySQL($string, $pos, $len)" + case BinaryType => s"${classOf[ByteArray].getName}.subStringSQL($string, $pos, $len)" } }) } diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java b/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java index c08c9c73d2..3ced2094f5 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java @@ -19,7 +19,11 @@ package org.apache.spark.unsafe.types; import org.apache.spark.unsafe.Platform; -public class ByteArray { +import java.util.Arrays; + +public final class ByteArray { + + public static final byte[] EMPTY_BYTE = new byte[0]; /** * Writes the content of a byte array into a memory address, identified by an object and an @@ -29,4 +33,45 @@ public class ByteArray { public static void writeToMemory(byte[] src, Object target, long targetOffset) { Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET, target, targetOffset, src.length); } + + /** + * Returns a 64-bit integer that can be used as the prefix used in sorting. + */ + public static long getPrefix(byte[] bytes) { + if (bytes == null) { + return 0L; + } else { + final int minLen = Math.min(bytes.length, 8); + long p = 0; + for (int i = 0; i < minLen; ++i) { + p |= (128L + Platform.getByte(bytes, Platform.BYTE_ARRAY_OFFSET + i)) + << (56 - 8 * i); + } + return p; + } + } + + public static byte[] subStringSQL(byte[] bytes, int pos, int len) { + // This pos calculation is according to UTF8String#subStringSQL + if (pos > bytes.length) { + return EMPTY_BYTE; + } + int start = 0; + int end; + if (pos > 0) { + start = pos - 1; + } else if (pos < 0) { + start = bytes.length + pos; + } + if ((bytes.length - start) < len) { + end = bytes.length; + } else { + end = start + len; + } + start = Math.max(start, 0); // underflow + if (start >= end) { + return EMPTY_BYTE; + } + return Arrays.copyOfRange(bytes, start, end); + } } -- GitLab