Skip to content
Snippets Groups Projects
Commit 89fd9bd0 authored by Josh Rosen's avatar Josh Rosen
Browse files

[SPARK-11887] Close PersistenceEngine at the end of PersistenceEngineSuite tests

In PersistenceEngineSuite, we do not call `close()` on the PersistenceEngine at the end of the test. For the ZooKeeperPersistenceEngine, this causes us to leak a ZooKeeper client, causing the logs of unrelated tests to be periodically spammed with connection error messages from that client:

```
15/11/20 05:13:35.789 pool-1-thread-1-ScalaTest-running-PersistenceEngineSuite-SendThread(localhost:15741) INFO ClientCnxn: Opening socket connection to server localhost/127.0.0.1:15741. Will not attempt to authenticate using SASL (unknown error)
15/11/20 05:13:35.790 pool-1-thread-1-ScalaTest-running-PersistenceEngineSuite-SendThread(localhost:15741) WARN ClientCnxn: Session 0x15124ff48dd0000 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068)
```

This patch fixes this by using a `finally` block.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #9864 from JoshRosen/close-zookeeper-client-in-tests.
parent be7a2cfd
No related branches found
No related tags found
No related merge requests found
......@@ -63,56 +63,60 @@ class PersistenceEngineSuite extends SparkFunSuite {
conf: SparkConf, persistenceEngineCreator: Serializer => PersistenceEngine): Unit = {
val serializer = new JavaSerializer(conf)
val persistenceEngine = persistenceEngineCreator(serializer)
persistenceEngine.persist("test_1", "test_1_value")
assert(Seq("test_1_value") === persistenceEngine.read[String]("test_"))
persistenceEngine.persist("test_2", "test_2_value")
assert(Set("test_1_value", "test_2_value") === persistenceEngine.read[String]("test_").toSet)
persistenceEngine.unpersist("test_1")
assert(Seq("test_2_value") === persistenceEngine.read[String]("test_"))
persistenceEngine.unpersist("test_2")
assert(persistenceEngine.read[String]("test_").isEmpty)
// Test deserializing objects that contain RpcEndpointRef
val testRpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
try {
// Create a real endpoint so that we can test RpcEndpointRef deserialization
val workerEndpoint = testRpcEnv.setupEndpoint("worker", new RpcEndpoint {
override val rpcEnv: RpcEnv = testRpcEnv
})
val workerToPersist = new WorkerInfo(
id = "test_worker",
host = "127.0.0.1",
port = 10000,
cores = 0,
memory = 0,
endpoint = workerEndpoint,
webUiPort = 0,
publicAddress = ""
)
persistenceEngine.addWorker(workerToPersist)
val (storedApps, storedDrivers, storedWorkers) =
persistenceEngine.readPersistedData(testRpcEnv)
assert(storedApps.isEmpty)
assert(storedDrivers.isEmpty)
// Check deserializing WorkerInfo
assert(storedWorkers.size == 1)
val recoveryWorkerInfo = storedWorkers.head
assert(workerToPersist.id === recoveryWorkerInfo.id)
assert(workerToPersist.host === recoveryWorkerInfo.host)
assert(workerToPersist.port === recoveryWorkerInfo.port)
assert(workerToPersist.cores === recoveryWorkerInfo.cores)
assert(workerToPersist.memory === recoveryWorkerInfo.memory)
assert(workerToPersist.endpoint === recoveryWorkerInfo.endpoint)
assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort)
assert(workerToPersist.publicAddress === recoveryWorkerInfo.publicAddress)
persistenceEngine.persist("test_1", "test_1_value")
assert(Seq("test_1_value") === persistenceEngine.read[String]("test_"))
persistenceEngine.persist("test_2", "test_2_value")
assert(Set("test_1_value", "test_2_value") === persistenceEngine.read[String]("test_").toSet)
persistenceEngine.unpersist("test_1")
assert(Seq("test_2_value") === persistenceEngine.read[String]("test_"))
persistenceEngine.unpersist("test_2")
assert(persistenceEngine.read[String]("test_").isEmpty)
// Test deserializing objects that contain RpcEndpointRef
val testRpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
try {
// Create a real endpoint so that we can test RpcEndpointRef deserialization
val workerEndpoint = testRpcEnv.setupEndpoint("worker", new RpcEndpoint {
override val rpcEnv: RpcEnv = testRpcEnv
})
val workerToPersist = new WorkerInfo(
id = "test_worker",
host = "127.0.0.1",
port = 10000,
cores = 0,
memory = 0,
endpoint = workerEndpoint,
webUiPort = 0,
publicAddress = ""
)
persistenceEngine.addWorker(workerToPersist)
val (storedApps, storedDrivers, storedWorkers) =
persistenceEngine.readPersistedData(testRpcEnv)
assert(storedApps.isEmpty)
assert(storedDrivers.isEmpty)
// Check deserializing WorkerInfo
assert(storedWorkers.size == 1)
val recoveryWorkerInfo = storedWorkers.head
assert(workerToPersist.id === recoveryWorkerInfo.id)
assert(workerToPersist.host === recoveryWorkerInfo.host)
assert(workerToPersist.port === recoveryWorkerInfo.port)
assert(workerToPersist.cores === recoveryWorkerInfo.cores)
assert(workerToPersist.memory === recoveryWorkerInfo.memory)
assert(workerToPersist.endpoint === recoveryWorkerInfo.endpoint)
assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort)
assert(workerToPersist.publicAddress === recoveryWorkerInfo.publicAddress)
} finally {
testRpcEnv.shutdown()
testRpcEnv.awaitTermination()
}
} finally {
testRpcEnv.shutdown()
testRpcEnv.awaitTermination()
persistenceEngine.close()
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment