Skip to content
Snippets Groups Projects
Commit fa66ef6c authored by Jacky Li's avatar Jacky Li Committed by Michael Armbrust
Browse files

[SPARK-4269][SQL] make wait time configurable in BroadcastHashJoin

In BroadcastHashJoin, currently it is using a hard coded value (5 minutes) to wait for the execution and broadcast of the small table.
In my opinion, it should be a configurable value since broadcast may exceed 5 minutes in some case, like in a busy/congested network environment.

Author: Jacky Li <jacky.likun@huawei.com>

Closes #3133 from jackylk/timeout-config and squashes the following commits:

733ac08 [Jacky Li] add spark.sql.broadcastTimeout in SQLConf.scala
557acd4 [Jacky Li] switch to sqlContext.getConf
81a5e20 [Jacky Li] make wait time configurable in BroadcastHashJoin
parent a66c23e1
No related branches found
No related tags found
No related merge requests found
......@@ -38,6 +38,7 @@ private[spark] object SQLConf {
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
// Options that control which operators can be chosen by the query planner. These should be
// considered hints and may be ignored by future versions of Spark SQL.
......@@ -148,6 +149,12 @@ private[sql] trait SQLConf {
private[spark] def columnNameOfCorruptRecord: String =
getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record")
/**
* Timeout in seconds for the broadcast wait time in hash join
*/
private[spark] def broadcastTimeout: Int =
getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
......
......@@ -42,6 +42,15 @@ case class BroadcastHashJoin(
right: SparkPlan)
extends BinaryNode with HashJoin {
val timeout = {
val timeoutValue = sqlContext.broadcastTimeout
if (timeoutValue < 0) {
Duration.Inf
} else {
timeoutValue.seconds
}
}
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
override def requiredChildDistribution =
......@@ -56,7 +65,7 @@ case class BroadcastHashJoin(
}
override def execute() = {
val broadcastRelation = Await.result(broadcastFuture, 5.minute)
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
hashJoin(streamedIter, broadcastRelation.value)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment