From 1fca9da95dc9b9aaf9ae75fd7456378861d8b409 Mon Sep 17 00:00:00 2001
From: cody koeninger <cody@koeninger.org>
Date: Tue, 5 Jul 2016 11:45:54 -0700
Subject: [PATCH] [SPARK-16212][STREAMING][KAFKA] use random port for embedded
 kafka

## What changes were proposed in this pull request?

Testing for 0.10 uncovered an issue with a fixed port number being used in KafkaTestUtils.  This is making a roughly equivalent fix for the 0.8 connector

## How was this patch tested?

Unit tests, manual tests

Author: cody koeninger <cody@koeninger.org>

Closes #14018 from koeninger/kafka-0-8-test-port.
---
 .../org/apache/spark/streaming/kafka/KafkaTestUtils.scala   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index d9d4240c05..abfd7aad4c 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -35,6 +35,7 @@ import kafka.serializer.StringEncoder
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{ZKStringSerializer, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
+import org.apache.commons.lang3.RandomUtils
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 
 import org.apache.spark.SparkConf
@@ -62,7 +63,8 @@ private[kafka] class KafkaTestUtils extends Logging {
 
   // Kafka broker related configurations
   private val brokerHost = "localhost"
-  private var brokerPort = 9092
+  // 0.8.2 server doesn't have a boundPort method, so can't use 0 for a random port
+  private var brokerPort = RandomUtils.nextInt(1024, 65536)
   private var brokerConf: KafkaConfig = _
 
   // Kafka broker server
@@ -112,7 +114,7 @@ private[kafka] class KafkaTestUtils extends Logging {
       brokerConf = new KafkaConfig(brokerConfiguration)
       server = new KafkaServer(brokerConf)
       server.startup()
-      (server, port)
+      (server, brokerPort)
     }, new SparkConf(), "KafkaBroker")
 
     brokerReady = true
-- 
GitLab