Skip to content
Snippets Groups Projects
Commit 743a31a7 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #210 from haitaoyao/http-timeout

add http timeout for httpbroadcast

While pulling task bytecode from HttpBroadcast server, there's no timeout value set. This may cause spark executor code hang and other task in the same executor process wait for the lock. I have encountered the issue in my cluster. Here's the stacktrace I captured  : https://gist.github.com/haitaoyao/7655830

So add a time out value to ensure the task fail fast.
parents fb6875dd db998a6e
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ package org.apache.spark.broadcast ...@@ -19,6 +19,7 @@ package org.apache.spark.broadcast
import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream} import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
import java.net.URL import java.net.URL
import java.util.concurrent.TimeUnit
import it.unimi.dsi.fastutil.io.FastBufferedInputStream import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
...@@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging { ...@@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging {
private val files = new TimeStampedHashSet[String] private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup) private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt
private lazy val compressionCodec = CompressionCodec.createCodec() private lazy val compressionCodec = CompressionCodec.createCodec()
def initialize(isDriver: Boolean) { def initialize(isDriver: Boolean) {
...@@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging { ...@@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging {
def read[T](id: Long): T = { def read[T](id: Long): T = {
val url = serverUri + "/" + BroadcastBlockId(id).name val url = serverUri + "/" + BroadcastBlockId(id).name
val in = { val in = {
val httpConnection = new URL(url).openConnection()
httpConnection.setReadTimeout(httpReadTimeout)
val inputStream = httpConnection.getInputStream()
if (compress) { if (compress) {
compressionCodec.compressedInputStream(new URL(url).openStream()) compressionCodec.compressedInputStream(inputStream)
} else { } else {
new FastBufferedInputStream(new URL(url).openStream(), bufferSize) new FastBufferedInputStream(inputStream, bufferSize)
} }
} }
val ser = SparkEnv.get.serializer.newInstance() val ser = SparkEnv.get.serializer.newInstance()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment