diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index 4c1d6a03eb2b8419cc82d8db1a0a2b505274e8dd..c0669fb33665769423513babda2b828c5e5c4cee 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -18,9 +18,7 @@
 package org.apache.spark.streaming.kafka;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Arrays;
+import java.util.*;
 
 import scala.Tuple2;
 
@@ -116,7 +114,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
     );
     JavaDStream<String> unifiedStream = stream1.union(stream2);
 
-    final HashSet<String> result = new HashSet<String>();
+    final Set<String> result = Collections.synchronizedSet(new HashSet<String>());
     unifiedStream.foreachRDD(
         new Function<JavaRDD<String>, Void>() {
           @Override
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 540f4ceabab47259ac8dc4727e74f3c0dd5070be..e4c659215b767ddb3a1cd2851838923bfdd0f5a3 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,9 +18,7 @@
 package org.apache.spark.streaming.kafka;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
+import java.util.*;
 
 import scala.Tuple2;
 
@@ -94,7 +92,7 @@ public class JavaKafkaStreamSuite implements Serializable {
       topics,
       StorageLevel.MEMORY_ONLY_SER());
 
-    final HashMap<String, Long> result = new HashMap<String, Long>();
+    final Map<String, Long> result = Collections.synchronizedMap(new HashMap<String, Long>());
 
     JavaDStream<String> words = stream.map(
       new Function<Tuple2<String, String>, String>() {
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 47bbfb605850a5103228b62d5c7ce61d138bf330..212eb35c61b66453481f9bcfe351268b35f075aa 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -99,7 +99,8 @@ class DirectKafkaStreamSuite
         ssc, kafkaParams, topics)
     }
 
-    val allReceived = new ArrayBuffer[(String, String)]
+    val allReceived =
+      new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
 
     stream.foreachRDD { rdd =>
     // Get the offset ranges in the RDD
@@ -162,7 +163,7 @@ class DirectKafkaStreamSuite
       "Start offset not from latest"
     )
 
-    val collectedData = new mutable.ArrayBuffer[String]()
+    val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
     stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
     ssc.start()
     val newData = Map("b" -> 10)
@@ -208,7 +209,7 @@ class DirectKafkaStreamSuite
       "Start offset not from latest"
     )
 
-    val collectedData = new mutable.ArrayBuffer[String]()
+    val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
     stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
     ssc.start()
     val newData = Map("b" -> 10)
@@ -324,7 +325,8 @@ class DirectKafkaStreamSuite
         ssc, kafkaParams, Set(topic))
     }
 
-    val allReceived = new ArrayBuffer[(String, String)]
+    val allReceived =
+      new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
 
     stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
     ssc.start()
@@ -350,8 +352,8 @@ class DirectKafkaStreamSuite
 }
 
 object DirectKafkaStreamSuite {
-  val collectedData = new mutable.ArrayBuffer[String]()
-  var total = -1L
+  val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
+  @volatile var total = -1L
 
   class InputInfoCollector extends StreamingListener {
     val numRecordsSubmitted = new AtomicLong(0L)
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 8ee2cc660f84971ecee084cf516a3392abc60bbb..797b07f80d8eeafd58d13a183b55bfda5989fb33 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -65,7 +65,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
 
     val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
       ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
-    val result = new mutable.HashMap[String, Long]()
+    val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
     stream.map(_._2).countByValue().foreachRDD { r =>
       val ret = r.collect()
       ret.toMap.foreach { kv =>
@@ -77,10 +77,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     ssc.start()
 
     eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
-      assert(sent.size === result.size)
-      sent.keys.foreach { k =>
-        assert(sent(k) === result(k).toInt)
-      }
+      assert(sent === result)
     }
   }
 }