Skip to content
Snippets Groups Projects
Commit 90d384dc authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Andrew Or
Browse files

[SPARK-11831][CORE][TESTS] Use port 0 to avoid port conflicts in tests

Use port 0 to fix port-contention-related flakiness

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9841 from zsxwing/SPARK-11831.
parent 014c0f7a
No related branches found
No related tags found
No related merge requests found
...@@ -39,7 +39,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -39,7 +39,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override def beforeAll(): Unit = { override def beforeAll(): Unit = {
val conf = new SparkConf() val conf = new SparkConf()
env = createRpcEnv(conf, "local", 12345) env = createRpcEnv(conf, "local", 0)
} }
override def afterAll(): Unit = { override def afterAll(): Unit = {
...@@ -76,7 +76,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -76,7 +76,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
} }
}) })
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true) val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef // Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "send-remotely") val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "send-remotely")
try { try {
...@@ -130,7 +130,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -130,7 +130,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
} }
}) })
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true) val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef // Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-remotely") val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-remotely")
try { try {
...@@ -158,7 +158,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -158,7 +158,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val shortProp = "spark.rpc.short.timeout" val shortProp = "spark.rpc.short.timeout"
conf.set("spark.rpc.retry.wait", "0") conf.set("spark.rpc.retry.wait", "0")
conf.set("spark.rpc.numRetries", "1") conf.set("spark.rpc.numRetries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345, clientMode = true) val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef // Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout") val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
try { try {
...@@ -417,7 +417,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -417,7 +417,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
} }
}) })
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true) val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef // Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "sendWithReply-remotely") val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "sendWithReply-remotely")
try { try {
...@@ -457,7 +457,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -457,7 +457,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
} }
}) })
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true) val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef // Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef( val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "sendWithReply-remotely-error") "local", env.address, "sendWithReply-remotely-error")
...@@ -497,7 +497,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -497,7 +497,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}) })
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true) val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef // Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef( val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "network-events") "local", env.address, "network-events")
...@@ -543,7 +543,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -543,7 +543,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
} }
}) })
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true) val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef // Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef( val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "sendWithReply-unserializable-error") "local", env.address, "sendWithReply-unserializable-error")
...@@ -571,8 +571,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -571,8 +571,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.authenticate", "true") conf.set("spark.authenticate", "true")
conf.set("spark.authenticate.secret", "good") conf.set("spark.authenticate.secret", "good")
val localEnv = createRpcEnv(conf, "authentication-local", 13345) val localEnv = createRpcEnv(conf, "authentication-local", 0)
val remoteEnv = createRpcEnv(conf, "authentication-remote", 14345, clientMode = true) val remoteEnv = createRpcEnv(conf, "authentication-remote", 0, clientMode = true)
try { try {
@volatile var message: String = null @volatile var message: String = null
...@@ -602,8 +602,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -602,8 +602,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.authenticate", "true") conf.set("spark.authenticate", "true")
conf.set("spark.authenticate.secret", "good") conf.set("spark.authenticate.secret", "good")
val localEnv = createRpcEnv(conf, "authentication-local", 13345) val localEnv = createRpcEnv(conf, "authentication-local", 0)
val remoteEnv = createRpcEnv(conf, "authentication-remote", 14345, clientMode = true) val remoteEnv = createRpcEnv(conf, "authentication-remote", 0, clientMode = true)
try { try {
localEnv.setupEndpoint("ask-authentication", new RpcEndpoint { localEnv.setupEndpoint("ask-authentication", new RpcEndpoint {
......
...@@ -40,7 +40,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite { ...@@ -40,7 +40,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
}) })
val conf = new SparkConf() val conf = new SparkConf()
val newRpcEnv = new AkkaRpcEnvFactory().create( val newRpcEnv = new AkkaRpcEnvFactory().create(
RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf), false)) RpcEnvConfig(conf, "test", "localhost", 0, new SecurityManager(conf), false))
try { try {
val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_endpoint") val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_endpoint")
assert(s"akka.tcp://local@${env.address}/user/test_endpoint" === assert(s"akka.tcp://local@${env.address}/user/test_endpoint" ===
...@@ -59,7 +59,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite { ...@@ -59,7 +59,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
val conf = SSLSampleConfigs.sparkSSLConfig() val conf = SSLSampleConfigs.sparkSSLConfig()
val securityManager = new SecurityManager(conf) val securityManager = new SecurityManager(conf)
val rpcEnv = new AkkaRpcEnvFactory().create( val rpcEnv = new AkkaRpcEnvFactory().create(
RpcEnvConfig(conf, "test", "localhost", 12346, securityManager, false)) RpcEnvConfig(conf, "test", "localhost", 0, securityManager, false))
try { try {
val uri = rpcEnv.uriOf("local", RpcAddress("1.2.3.4", 12345), "test_endpoint") val uri = rpcEnv.uriOf("local", RpcAddress("1.2.3.4", 12345), "test_endpoint")
assert("akka.ssl.tcp://local@1.2.3.4:12345/user/test_endpoint" === uri) assert("akka.ssl.tcp://local@1.2.3.4:12345/user/test_endpoint" === uri)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment