diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 09a9ccc226721ddc942c0a8c9460cdb3629e2618..8de3a6c04df34a81aecaf7c49eef4769b888b4a5 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -160,7 +160,7 @@ private[spark] class HttpServer( throw new ServerStateException("Server is not started") } else { val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http" - s"$scheme://${Utils.localIpAddress}:$port" + s"$scheme://${Utils.localHostNameForURI()}:$port" } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 3ab425aab84c823caab336b3aee4dcb030de1c62..f0e77c2ba982b20ff21238caec4929a86d02a2ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -53,7 +53,7 @@ class LocalSparkCluster( /* Start the Master */ val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf) masterActorSystems += masterSystem - val masterUrl = "spark://" + localHostname + ":" + masterPort + val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort val masters = Array(masterUrl) /* Start the Workers */ diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index c1c4812f17fbe20c03f0da3282b240957ccf3b96..40835b9550586ebcb8de272ffb95b208f8fb923a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -46,7 +46,7 @@ private[spark] object TestClient { def main(args: Array[String]) { val url = args(0) val conf = new SparkConf - val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, + val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), 0, conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription("TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored") diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index ea548f23120d981b27d887d49e6c8b3c9c31caf7..f9860d1a5ce769cfade3868f09e796d22cbb43ff 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -48,7 +48,7 @@ private[spark] abstract class WebUI( protected val handlers = ArrayBuffer[ServletContextHandler]() protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]] protected var serverInfo: Option[ServerInfo] = None - protected val localHostName = Utils.localHostName() + protected val localHostName = Utils.localHostNameForURI() protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) private val className = Utils.getFormattedClassName(this) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0fdfaf300e95d97affb3b4c52649d0e2493146bd..a541d660cd5c6cbe4ad12e915dd76e0f9e5b6aa2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -34,6 +34,7 @@ import scala.util.Try import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.{ByteStreams, Files} +import com.google.common.net.InetAddresses import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration @@ -789,13 +790,12 @@ private[spark] object Utils extends Logging { * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4). * Note, this is typically not used from within core spark. */ - lazy val localIpAddress: String = findLocalIpAddress() - lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress) + private lazy val localIpAddress: InetAddress = findLocalInetAddress() - private def findLocalIpAddress(): String = { + private def findLocalInetAddress(): InetAddress = { val defaultIpOverride = System.getenv("SPARK_LOCAL_IP") if (defaultIpOverride != null) { - defaultIpOverride + InetAddress.getByName(defaultIpOverride) } else { val address = InetAddress.getLocalHost if (address.isLoopbackAddress) { @@ -806,15 +806,20 @@ private[spark] object Utils extends Logging { // It's more proper to pick ip address following system output order. val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList val reOrderedNetworkIFs = if (isWindows) activeNetworkIFs else activeNetworkIFs.reverse + for (ni <- reOrderedNetworkIFs) { - for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress && - !addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) { + val addresses = ni.getInetAddresses.toList + .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress) + if (addresses.nonEmpty) { + val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head) + // because of Inet6Address.toHostName may add interface at the end if it knows about it + val strippedAddress = InetAddress.getByAddress(addr.getAddress) // We've found an address that looks reasonable! logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" + - " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress + - " instead (on interface " + ni.getName + ")") + " a loopback address: " + address.getHostAddress + "; using " + + strippedAddress.getHostAddress + " instead (on interface " + ni.getName + ")") logWarning("Set SPARK_LOCAL_IP if you need to bind to another address") - return addr.getHostAddress + return strippedAddress } } logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" + @@ -822,7 +827,7 @@ private[spark] object Utils extends Logging { " external IP address!") logWarning("Set SPARK_LOCAL_IP if you need to bind to another address") } - address.getHostAddress + address } } @@ -842,11 +847,14 @@ private[spark] object Utils extends Logging { * Get the local machine's hostname. */ def localHostName(): String = { - customHostname.getOrElse(localIpAddressHostname) + customHostname.getOrElse(localIpAddress.getHostAddress) } - def getAddressHostName(address: String): String = { - InetAddress.getByName(address).getHostName + /** + * Get the local machine's URI. + */ + def localHostNameForURI(): String = { + customHostname.getOrElse(InetAddresses.toUriString(localIpAddress)) } def checkHost(host: String, message: String = "") { diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 1bd1f324298e7307264c7136d2fbaea10acdf17e..a7fe4476cacb8aff33bb77dba483ef3abb5316ec 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -23,6 +23,7 @@ import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.Duration import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.util.Utils import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.auth.DefaultAWSCredentialsProviderChain @@ -118,7 +119,7 @@ private[kinesis] class KinesisReceiver( * method. */ override def onStart() { - workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID() + workerId = Utils.localHostName() + ":" + UUID.randomUUID() credentialsProvider = new DefaultAWSCredentialsProviderChain() kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId).withKinesisEndpoint(endpointUrl) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 158c2251597202baf9a750e1716e8b111947c891..97b46a01ba5b4f37a706a5b6c98abb67946ed76d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.scheduler.StatsReportListener import org.apache.spark.sql.hive.{HiveShim, HiveContext} import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ private[hive] object SparkSQLEnv extends Logging { @@ -37,7 +38,7 @@ private[hive] object SparkSQLEnv extends Logging { val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking") sparkConf - .setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}") + .setAppName(s"SparkSQL::${Utils.localHostName()}") .set("spark.sql.hive.version", HiveShim.version) .set( "spark.serializer",