Skip to content
Snippets Groups Projects
Commit 0765af9b authored by Hari Shreedharan's avatar Hari Shreedharan Committed by Josh Rosen
Browse files

[SPARK-4905][STREAMING] FlumeStreamSuite fix.

Using String constructor instead of CharsetDecoder to see if it fixes the issue of empty strings in Flume test output.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #4371 from harishreedharan/Flume-stream-attempted-fix and squashes the following commits:

550d363 [Hari Shreedharan] Fix imports.
8695950 [Hari Shreedharan] Use Charsets.UTF_8 instead of "UTF-8" in String constructors.
af3ba14 [Hari Shreedharan] [SPARK-4905][STREAMING] FlumeStreamSuite fix.
parent 6fe70d84
No related branches found
No related tags found
No related merge requests found
......@@ -19,13 +19,13 @@ package org.apache.spark.streaming.flume
import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
import java.nio.charset.Charset
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
import com.google.common.base.Charsets
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.source.avro
......@@ -108,7 +108,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
val inputEvents = input.map { item =>
val event = new AvroFlumeEvent
event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
event.setBody(ByteBuffer.wrap(item.getBytes(Charsets.UTF_8)))
event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
event
}
......@@ -138,14 +138,13 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
status should be (avro.Status.OK)
}
val decoder = Charset.forName("UTF-8").newDecoder()
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val outputEvents = outputBuffer.flatten.map { _.event }
outputEvents.foreach {
event =>
event.getHeaders.get("test") should be("header")
}
val output = outputEvents.map(event => decoder.decode(event.getBody()).toString)
val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8))
output should be (input)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment