From 5ba2d44068b89fd8e81cfd24f49bf20d373f81b9 Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@databricks.com> Date: Thu, 30 Jul 2015 01:21:39 -0700 Subject: [PATCH] Fix flaky HashedRelationSuite SparkEnv might not have been set in local unit tests. Author: Reynold Xin <rxin@databricks.com> Closes #7784 from rxin/HashedRelationSuite and squashes the following commits: 435d64b [Reynold Xin] Fix flaky HashedRelationSuite --- .../apache/spark/sql/execution/joins/HashedRelation.scala | 7 +++++-- .../spark/sql/execution/joins/HashedRelationSuite.scala | 6 +++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 7a50739131..26dbc911e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -21,7 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.nio.ByteOrder import java.util.{HashMap => JavaHashMap} -import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.{SparkConf, SparkEnv, TaskContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkSqlSerializer @@ -260,7 +260,10 @@ private[joins] final class UnsafeHashedRelation( val nKeys = in.readInt() // This is used in Broadcast, shared by multiple tasks, so we use on-heap memory val memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)) - val pageSizeBytes = SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m") + + val pageSizeBytes = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) + .getSizeAsBytes("spark.buffer.pageSize", "64m") + binaryMap = new BytesToBytesMap( memoryManager, nKeys * 2, // reduce hash collision diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 941f6d4f6a..8b1a9b21a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -33,7 +33,7 @@ class HashedRelationSuite extends SparkFunSuite { override def apply(row: InternalRow): InternalRow = row } - ignore("GeneralHashedRelation") { + test("GeneralHashedRelation") { val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2)) val hashed = HashedRelation(data.iterator, keyProjection) assert(hashed.isInstanceOf[GeneralHashedRelation]) @@ -47,7 +47,7 @@ class HashedRelationSuite extends SparkFunSuite { assert(hashed.get(data(2)) === data2) } - ignore("UniqueKeyHashedRelation") { + test("UniqueKeyHashedRelation") { val data = Array(InternalRow(0), InternalRow(1), InternalRow(2)) val hashed = HashedRelation(data.iterator, keyProjection) assert(hashed.isInstanceOf[UniqueKeyHashedRelation]) @@ -64,7 +64,7 @@ class HashedRelationSuite extends SparkFunSuite { assert(uniqHashed.getValue(InternalRow(10)) === null) } - ignore("UnsafeHashedRelation") { + test("UnsafeHashedRelation") { val schema = StructType(StructField("a", IntegerType, true) :: Nil) val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2)) val toUnsafe = UnsafeProjection.create(schema) -- GitLab