From 0765af9b21e9204c410c7a849c7201bc3eda8cc3 Mon Sep 17 00:00:00 2001
From: Hari Shreedharan <hshreedharan@apache.org>
Date: Mon, 9 Feb 2015 14:17:14 -0800
Subject: [PATCH] [SPARK-4905][STREAMING] FlumeStreamSuite fix.

Using String constructor instead of CharsetDecoder to see if it fixes the issue of empty strings in Flume test output.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #4371 from harishreedharan/Flume-stream-attempted-fix and squashes the following commits:

550d363 [Hari Shreedharan] Fix imports.
8695950 [Hari Shreedharan] Use Charsets.UTF_8 instead of "UTF-8" in String constructors.
af3ba14 [Hari Shreedharan] [SPARK-4905][STREAMING] FlumeStreamSuite fix.
---
 .../apache/spark/streaming/flume/FlumeStreamSuite.scala    | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index f333e3891b..322de7bf2f 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -19,13 +19,13 @@ package org.apache.spark.streaming.flume
 
 import java.net.{InetSocketAddress, ServerSocket}
 import java.nio.ByteBuffer
-import java.nio.charset.Charset
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import com.google.common.base.Charsets
 import org.apache.avro.ipc.NettyTransceiver
 import org.apache.avro.ipc.specific.SpecificRequestor
 import org.apache.flume.source.avro
@@ -108,7 +108,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
 
     val inputEvents = input.map { item =>
       val event = new AvroFlumeEvent
-      event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
+      event.setBody(ByteBuffer.wrap(item.getBytes(Charsets.UTF_8)))
       event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
       event
     }
@@ -138,14 +138,13 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
       status should be (avro.Status.OK)
     }
     
-    val decoder = Charset.forName("UTF-8").newDecoder()    
     eventually(timeout(10 seconds), interval(100 milliseconds)) {
       val outputEvents = outputBuffer.flatten.map { _.event }
       outputEvents.foreach {
         event =>
           event.getHeaders.get("test") should be("header")
       }
-      val output = outputEvents.map(event => decoder.decode(event.getBody()).toString)
+      val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8))
       output should be (input)
     }
   }
-- 
GitLab