Skip to content
Snippets Groups Projects
Commit 9d117cee authored by nyaapa's avatar nyaapa Committed by Sean Owen
Browse files

[SPARK-6440][CORE]Handle IPv6 addresses properly when constructing URI

Author: nyaapa <nyaapa@gmail.com>

Closes #5424 from nyaapa/master and squashes the following commits:

6b717aa [nyaapa] [SPARK-6440][CORE] Remove Utils.localIpAddressHostname, Utils.localIpAddressURI and Utils.getAddressHostName; make Utils.localIpAddress private; rename Utils.localHostURI into Utils.localHostNameForURI; use Utils.localHostName in org.apache.spark.streaming.kinesis.KinesisReceiver and org.apache.spark.sql.hive.thriftserver.SparkSQLEnv
2098081 [nyaapa] [SPARK-6440][CORE] style fixes and use getHostAddress instead of getHostName
84763d7 [nyaapa] [SPARK-6440][CORE]Handle IPv6 addresses properly when constructing URI
parent 14ce3ea2
No related branches found
No related tags found
No related merge requests found
......@@ -160,7 +160,7 @@ private[spark] class HttpServer(
throw new ServerStateException("Server is not started")
} else {
val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
s"$scheme://${Utils.localIpAddress}:$port"
s"$scheme://${Utils.localHostNameForURI()}:$port"
}
}
}
......@@ -53,7 +53,7 @@ class LocalSparkCluster(
/* Start the Master */
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
val masters = Array(masterUrl)
/* Start the Workers */
......
......@@ -46,7 +46,7 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
val conf = new SparkConf
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription("TestClient", Some(1), 512,
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
......
......@@ -48,7 +48,7 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
protected var serverInfo: Option[ServerInfo] = None
protected val localHostName = Utils.localHostName()
protected val localHostName = Utils.localHostNameForURI()
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
private val className = Utils.getFormattedClassName(this)
......
......@@ -34,6 +34,7 @@ import scala.util.Try
import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.{ByteStreams, Files}
import com.google.common.net.InetAddresses
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
......@@ -789,13 +790,12 @@ private[spark] object Utils extends Logging {
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
* Note, this is typically not used from within core spark.
*/
lazy val localIpAddress: String = findLocalIpAddress()
lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress)
private lazy val localIpAddress: InetAddress = findLocalInetAddress()
private def findLocalIpAddress(): String = {
private def findLocalInetAddress(): InetAddress = {
val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
if (defaultIpOverride != null) {
defaultIpOverride
InetAddress.getByName(defaultIpOverride)
} else {
val address = InetAddress.getLocalHost
if (address.isLoopbackAddress) {
......@@ -806,15 +806,20 @@ private[spark] object Utils extends Logging {
// It's more proper to pick ip address following system output order.
val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList
val reOrderedNetworkIFs = if (isWindows) activeNetworkIFs else activeNetworkIFs.reverse
for (ni <- reOrderedNetworkIFs) {
for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
!addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
val addresses = ni.getInetAddresses.toList
.filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress)
if (addresses.nonEmpty) {
val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
// because of Inet6Address.toHostName may add interface at the end if it knows about it
val strippedAddress = InetAddress.getByAddress(addr.getAddress)
// We've found an address that looks reasonable!
logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
" a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
" instead (on interface " + ni.getName + ")")
" a loopback address: " + address.getHostAddress + "; using " +
strippedAddress.getHostAddress + " instead (on interface " + ni.getName + ")")
logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
return addr.getHostAddress
return strippedAddress
}
}
logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
......@@ -822,7 +827,7 @@ private[spark] object Utils extends Logging {
" external IP address!")
logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
}
address.getHostAddress
address
}
}
......@@ -842,11 +847,14 @@ private[spark] object Utils extends Logging {
* Get the local machine's hostname.
*/
def localHostName(): String = {
customHostname.getOrElse(localIpAddressHostname)
customHostname.getOrElse(localIpAddress.getHostAddress)
}
def getAddressHostName(address: String): String = {
InetAddress.getByName(address).getHostName
/**
* Get the local machine's URI.
*/
def localHostNameForURI(): String = {
customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
}
def checkHost(host: String, message: String = "") {
......
......@@ -23,6 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
......@@ -118,7 +119,7 @@ private[kinesis] class KinesisReceiver(
* method.
*/
override def onStart() {
workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID()
workerId = Utils.localHostName() + ":" + UUID.randomUUID()
credentialsProvider = new DefaultAWSCredentialsProviderChain()
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName,
credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
......
......@@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.sql.hive.{HiveShim, HiveContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.util.Utils
/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
......@@ -37,7 +38,7 @@ private[hive] object SparkSQLEnv extends Logging {
val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking")
sparkConf
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}")
.setAppName(s"SparkSQL::${Utils.localHostName()}")
.set("spark.sql.hive.version", HiveShim.version)
.set(
"spark.serializer",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment