Skip to content
Snippets Groups Projects
Commit 5ba2d440 authored by Reynold Xin's avatar Reynold Xin
Browse files

Fix flaky HashedRelationSuite

SparkEnv might not have been set in local unit tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #7784 from rxin/HashedRelationSuite and squashes the following commits:

435d64b [Reynold Xin] Fix flaky HashedRelationSuite
parent 4a8bb9d0
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} ...@@ -21,7 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.nio.ByteOrder import java.nio.ByteOrder
import java.util.{HashMap => JavaHashMap} import java.util.{HashMap => JavaHashMap}
import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.{SparkConf, SparkEnv, TaskContext}
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkSqlSerializer import org.apache.spark.sql.execution.SparkSqlSerializer
...@@ -260,7 +260,10 @@ private[joins] final class UnsafeHashedRelation( ...@@ -260,7 +260,10 @@ private[joins] final class UnsafeHashedRelation(
val nKeys = in.readInt() val nKeys = in.readInt()
// This is used in Broadcast, shared by multiple tasks, so we use on-heap memory // This is used in Broadcast, shared by multiple tasks, so we use on-heap memory
val memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)) val memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
val pageSizeBytes = SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m")
val pageSizeBytes = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
.getSizeAsBytes("spark.buffer.pageSize", "64m")
binaryMap = new BytesToBytesMap( binaryMap = new BytesToBytesMap(
memoryManager, memoryManager,
nKeys * 2, // reduce hash collision nKeys * 2, // reduce hash collision
......
...@@ -33,7 +33,7 @@ class HashedRelationSuite extends SparkFunSuite { ...@@ -33,7 +33,7 @@ class HashedRelationSuite extends SparkFunSuite {
override def apply(row: InternalRow): InternalRow = row override def apply(row: InternalRow): InternalRow = row
} }
ignore("GeneralHashedRelation") { test("GeneralHashedRelation") {
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2)) val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
val hashed = HashedRelation(data.iterator, keyProjection) val hashed = HashedRelation(data.iterator, keyProjection)
assert(hashed.isInstanceOf[GeneralHashedRelation]) assert(hashed.isInstanceOf[GeneralHashedRelation])
...@@ -47,7 +47,7 @@ class HashedRelationSuite extends SparkFunSuite { ...@@ -47,7 +47,7 @@ class HashedRelationSuite extends SparkFunSuite {
assert(hashed.get(data(2)) === data2) assert(hashed.get(data(2)) === data2)
} }
ignore("UniqueKeyHashedRelation") { test("UniqueKeyHashedRelation") {
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2)) val data = Array(InternalRow(0), InternalRow(1), InternalRow(2))
val hashed = HashedRelation(data.iterator, keyProjection) val hashed = HashedRelation(data.iterator, keyProjection)
assert(hashed.isInstanceOf[UniqueKeyHashedRelation]) assert(hashed.isInstanceOf[UniqueKeyHashedRelation])
...@@ -64,7 +64,7 @@ class HashedRelationSuite extends SparkFunSuite { ...@@ -64,7 +64,7 @@ class HashedRelationSuite extends SparkFunSuite {
assert(uniqHashed.getValue(InternalRow(10)) === null) assert(uniqHashed.getValue(InternalRow(10)) === null)
} }
ignore("UnsafeHashedRelation") { test("UnsafeHashedRelation") {
val schema = StructType(StructField("a", IntegerType, true) :: Nil) val schema = StructType(StructField("a", IntegerType, true) :: Nil)
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2)) val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
val toUnsafe = UnsafeProjection.create(schema) val toUnsafe = UnsafeProjection.create(schema)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment