Skip to content
Snippets Groups Projects
Commit 4a8bb9d0 authored by Reynold Xin's avatar Reynold Xin
Browse files

Revert "[SPARK-9458] Avoid object allocation in prefix generation."

This reverts commit 9514d874.
parent 76f2e393
No related branches found
No related tags found
No related merge requests found
Showing with 67 additions and 35 deletions
......@@ -29,6 +29,7 @@ public class PrefixComparators {
public static final StringPrefixComparator STRING = new StringPrefixComparator();
public static final IntegralPrefixComparator INTEGRAL = new IntegralPrefixComparator();
public static final FloatPrefixComparator FLOAT = new FloatPrefixComparator();
public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator();
public static final class StringPrefixComparator extends PrefixComparator {
......@@ -54,6 +55,21 @@ public class PrefixComparators {
public final long NULL_PREFIX = Long.MIN_VALUE;
}
public static final class FloatPrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
float a = Float.intBitsToFloat((int) aPrefix);
float b = Float.intBitsToFloat((int) bPrefix);
return Utils.nanSafeCompareFloats(a, b);
}
public long computePrefix(float value) {
return Float.floatToIntBits(value) & 0xffffffffL;
}
public final long NULL_PREFIX = computePrefix(Float.NEGATIVE_INFINITY);
}
public static final class DoublePrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
......
......@@ -55,6 +55,18 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks {
forAll { (s1: String, s2: String) => testPrefixComparison(s1, s2) }
}
test("float prefix comparator handles NaN properly") {
val nan1: Float = java.lang.Float.intBitsToFloat(0x7f800001)
val nan2: Float = java.lang.Float.intBitsToFloat(0x7fffffff)
assert(nan1.isNaN)
assert(nan2.isNaN)
val nan1Prefix = PrefixComparators.FLOAT.computePrefix(nan1)
val nan2Prefix = PrefixComparators.FLOAT.computePrefix(nan2)
assert(nan1Prefix === nan2Prefix)
val floatMaxPrefix = PrefixComparators.FLOAT.computePrefix(Float.MaxValue)
assert(PrefixComparators.FLOAT.compare(nan1Prefix, floatMaxPrefix) === 1)
}
test("double prefix comparator handles NaNs properly") {
val nan1: Double = java.lang.Double.longBitsToDouble(0x7ff0000000000001L)
val nan2: Double = java.lang.Double.longBitsToDouble(0x7fffffffffffffffL)
......
......@@ -121,7 +121,7 @@ final class UnsafeExternalRowSorter {
// here in order to prevent memory leaks.
cleanupResources();
}
return new AbstractScalaRowIterator<InternalRow>() {
return new AbstractScalaRowIterator() {
private final int numFields = schema.length();
private UnsafeRow row = new UnsafeRow();
......
......@@ -19,7 +19,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{BoundReference, SortOrder}
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, PrefixComparator}
......@@ -39,54 +39,57 @@ object SortPrefixUtils {
sortOrder.dataType match {
case StringType => PrefixComparators.STRING
case BooleanType | ByteType | ShortType | IntegerType | LongType => PrefixComparators.INTEGRAL
case FloatType | DoubleType => PrefixComparators.DOUBLE
case FloatType => PrefixComparators.FLOAT
case DoubleType => PrefixComparators.DOUBLE
case _ => NoOpPrefixComparator
}
}
def getPrefixComputer(sortOrder: SortOrder): InternalRow => Long = {
val bound = sortOrder.child.asInstanceOf[BoundReference]
val pos = bound.ordinal
sortOrder.dataType match {
case StringType =>
(row: InternalRow) => {
PrefixComparators.STRING.computePrefix(row.getUTF8String(pos))
}
case StringType => (row: InternalRow) => {
PrefixComparators.STRING.computePrefix(sortOrder.child.eval(row).asInstanceOf[UTF8String])
}
case BooleanType =>
(row: InternalRow) => {
if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX
else if (row.getBoolean(pos)) 1
val exprVal = sortOrder.child.eval(row)
if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
else if (sortOrder.child.eval(row).asInstanceOf[Boolean]) 1
else 0
}
case ByteType =>
(row: InternalRow) => {
if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getByte(pos)
val exprVal = sortOrder.child.eval(row)
if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
else sortOrder.child.eval(row).asInstanceOf[Byte]
}
case ShortType =>
(row: InternalRow) => {
if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getShort(pos)
val exprVal = sortOrder.child.eval(row)
if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
else sortOrder.child.eval(row).asInstanceOf[Short]
}
case IntegerType =>
(row: InternalRow) => {
if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getInt(pos)
val exprVal = sortOrder.child.eval(row)
if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
else sortOrder.child.eval(row).asInstanceOf[Int]
}
case LongType =>
(row: InternalRow) => {
if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getLong(pos)
val exprVal = sortOrder.child.eval(row)
if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
else sortOrder.child.eval(row).asInstanceOf[Long]
}
case FloatType => (row: InternalRow) => {
if (row.isNullAt(pos)) {
PrefixComparators.DOUBLE.NULL_PREFIX
} else {
PrefixComparators.DOUBLE.computePrefix(row.getFloat(pos).toDouble)
}
val exprVal = sortOrder.child.eval(row)
if (exprVal == null) PrefixComparators.FLOAT.NULL_PREFIX
else PrefixComparators.FLOAT.computePrefix(sortOrder.child.eval(row).asInstanceOf[Float])
}
case DoubleType => (row: InternalRow) => {
if (row.isNullAt(pos)) {
PrefixComparators.DOUBLE.NULL_PREFIX
} else {
PrefixComparators.DOUBLE.computePrefix(row.getDouble(pos))
}
val exprVal = sortOrder.child.eval(row)
if (exprVal == null) PrefixComparators.DOUBLE.NULL_PREFIX
else PrefixComparators.DOUBLE.computePrefix(sortOrder.child.eval(row).asInstanceOf[Double])
}
case _ => (row: InternalRow) => 0L
}
......
......@@ -340,8 +340,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
TungstenSort.supportsSchema(child.schema)) {
execution.TungstenSort(sortExprs, global, child)
UnsafeExternalSort.supportsSchema(child.schema)) {
execution.UnsafeExternalSort(sortExprs, global, child)
} else if (sqlContext.conf.externalSortEnabled) {
execution.ExternalSort(sortExprs, global, child)
} else {
......
......@@ -97,7 +97,7 @@ case class ExternalSort(
* @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
* spill every `frequency` records.
*/
case class TungstenSort(
case class UnsafeExternalSort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan,
......@@ -110,6 +110,7 @@ case class TungstenSort(
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
assert(codegenEnabled, "UnsafeExternalSort requires code generation to be enabled")
def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
val ordering = newOrdering(sortOrder, child.output)
val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output)
......@@ -148,7 +149,7 @@ case class TungstenSort(
}
@DeveloperApi
object TungstenSort {
object UnsafeExternalSort {
/**
* Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise.
*/
......
......@@ -31,7 +31,7 @@ class RowFormatConvertersSuite extends SparkPlanTest {
private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
assert(!outputsSafe.outputsUnsafeRows)
private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null))
private val outputsUnsafe = UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
assert(outputsUnsafe.outputsUnsafeRows)
test("planner should insert unsafe->safe conversions when required") {
......
......@@ -42,7 +42,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
(child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
(child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
(child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
sortAnswers = false
)
......@@ -53,7 +53,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
try {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
(child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
(child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
(child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
sortAnswers = false
)
......@@ -68,7 +68,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
val stringLength = 1024 * 1024 * 2
checkThatPlansAgree(
Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
TungstenSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
UnsafeExternalSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
Sort(sortOrder, global = true, _: SparkPlan),
sortAnswers = false
)
......@@ -88,11 +88,11 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
TestSQLContext.sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
StructType(StructField("a", dataType, nullable = true) :: Nil)
)
assert(TungstenSort.supportsSchema(inputDf.schema))
assert(UnsafeExternalSort.supportsSchema(inputDf.schema))
checkThatPlansAgree(
inputDf,
plan => ConvertToSafe(
TungstenSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
UnsafeExternalSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
Sort(sortOrder, global = true, _: SparkPlan),
sortAnswers = false
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment