Skip to content
Snippets Groups Projects
Commit 74fb2ecf authored by jerryshao's avatar jerryshao Committed by Patrick Wendell
Browse files

[SPARK-3615][Streaming]Fix Kafka unit test hard coded Zookeeper port issue

Details can be seen in [SPARK-3615](https://issues.apache.org/jira/browse/SPARK-3615).

Author: jerryshao <saisai.shao@intel.com>

Closes #2483 from jerryshao/SPARK_3615 and squashes the following commits:

8555563 [jerryshao] Fix Kafka unit test hard coded Zookeeper port issue
parent bb96012b
No related branches found
No related tags found
No related merge requests found
......@@ -81,7 +81,7 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S
Predef.<Tuple2<String, Object>>conforms()));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");
......
......@@ -24,7 +24,7 @@ import java.util.{Properties, Random}
import scala.collection.mutable
import kafka.admin.CreateTopicCommand
import kafka.common.TopicAndPartition
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.utils.ZKStringSerializer
import kafka.serializer.{StringDecoder, StringEncoder}
......@@ -42,14 +42,13 @@ import org.apache.spark.util.Utils
class KafkaStreamSuite extends TestSuiteBase {
import KafkaTestUtils._
val zkConnect = "localhost:2181"
val zkHost = "localhost"
var zkPort: Int = 0
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000
val brokerPort = 9092
val brokerProps = getBrokerConfig(brokerPort, zkConnect)
val brokerConf = new KafkaConfig(brokerProps)
protected var brokerPort = 9092
protected var brokerConf: KafkaConfig = _
protected var zookeeper: EmbeddedZookeeper = _
protected var zkClient: ZkClient = _
protected var server: KafkaServer = _
......@@ -59,16 +58,35 @@ class KafkaStreamSuite extends TestSuiteBase {
override def beforeFunction() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(zkConnect)
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
logInfo("==================== 0 ====================")
zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
ZKStringSerializer)
logInfo("==================== 1 ====================")
// Kafka broker startup
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
server.startup()
logInfo("==================== 3 ====================")
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
server.startup()
logInfo("==================== 3 ====================")
bindSuccess = true
} catch {
case e: KafkaException =>
if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) {
brokerPort += 1
}
case e: Exception => throw new Exception("Kafka server create failed", e)
}
}
Thread.sleep(2000)
logInfo("==================== 4 ====================")
super.beforeFunction()
......@@ -92,7 +110,7 @@ class KafkaStreamSuite extends TestSuiteBase {
createTopic(topic)
produceAndSendMessage(topic, sent)
val kafkaParams = Map("zookeeper.connect" -> zkConnect,
val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")
......@@ -200,6 +218,8 @@ object KafkaTestUtils {
factory.configure(new InetSocketAddress(ip, port), 16)
factory.startup(zookeeper)
val actualPort = factory.getLocalPort
def shutdown() {
factory.shutdown()
Utils.deleteRecursively(snapshotDir)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment