diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
index 7b735133e3d14b2c819edfe0c17bffdbf527ad32..948af5947f5e191c1ae8d690bc6b16a1794e7528 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -131,6 +131,14 @@ class SparkSink extends AbstractSink with Logging with Configurable {
     blockingLatch.await()
     Status.BACKOFF
   }
+
+  private[flume] def getPort(): Int = {
+    serverOpt
+      .map(_.getPort)
+      .getOrElse(
+        throw new RuntimeException("Server was not started!")
+      )
+  }
 }
 
 /**
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index a69baa16981a18a2a3b42dd93ed1ba11208c307a..8a85b0f987e42f094764f1324f2839e2d06b8e49 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -22,6 +22,8 @@ import java.net.InetSocketAddress
 import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
 import java.util.Random
 
+import org.apache.spark.TestUtils
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
 
@@ -39,9 +41,6 @@ import org.apache.spark.util.Utils
 
 class FlumePollingStreamSuite extends TestSuiteBase {
 
-  val random = new Random()
-  /** Return a port in the ephemeral range. */
-  def getTestPort = random.nextInt(16382) + 49152
   val batchCount = 5
   val eventsPerBatch = 100
   val totalEventsPerChannel = batchCount * eventsPerBatch
@@ -77,17 +76,6 @@ class FlumePollingStreamSuite extends TestSuiteBase {
   }
 
   private def testFlumePolling(): Unit = {
-    val testPort = getTestPort
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
-    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
-      FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)),
-        StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
-    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
-      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
-    outputStream.register()
-
     // Start the channel and sink.
     val context = new Context()
     context.put("capacity", channelCapacity.toString)
@@ -98,10 +86,19 @@ class FlumePollingStreamSuite extends TestSuiteBase {
 
     val sink = new SparkSink()
     context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
     Configurables.configure(sink, context)
     sink.setChannel(channel)
     sink.start()
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+      FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", sink.getPort())),
+        StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
+    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+    outputStream.register()
     ssc.start()
 
     writeAndVerify(Seq(channel), ssc, outputBuffer)
@@ -111,18 +108,6 @@ class FlumePollingStreamSuite extends TestSuiteBase {
   }
 
   private def testFlumePollingMultipleHost(): Unit = {
-    val testPort = getTestPort
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
-    val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
-    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
-      FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
-        eventsPerBatch, 5)
-    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
-      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
-    outputStream.register()
-
     // Start the channel and sink.
     val context = new Context()
     context.put("capacity", channelCapacity.toString)
@@ -136,17 +121,29 @@ class FlumePollingStreamSuite extends TestSuiteBase {
 
     val sink = new SparkSink()
     context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
     Configurables.configure(sink, context)
     sink.setChannel(channel)
     sink.start()
 
     val sink2 = new SparkSink()
     context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1))
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
     Configurables.configure(sink2, context)
     sink2.setChannel(channel2)
     sink2.start()
+
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val addresses = Seq(sink.getPort(), sink2.getPort()).map(new InetSocketAddress("localhost", _))
+    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+      FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
+        eventsPerBatch, 5)
+    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+    outputStream.register()
+
     ssc.start()
     writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
     assertChannelIsEmpty(channel)