diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index ef8432ec0834ae593c2a7abd56c0a1391ea6dc46..7371f886575c62ac239a9282e1f42399cd5a2f17 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -79,8 +79,11 @@ private[spark] class SerializerManager( primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag } - def getSerializer(ct: ClassTag[_]): Serializer = { - if (canUseKryo(ct)) { + // SPARK-18617: As feature in SPARK-13990 can not be applied to Spark Streaming now. The worst + // result is streaming job based on `Receiver` mode can not run on Spark 2.x properly. It may be + // a rational choice to close `kryo auto pick` feature for streaming in the first step. + def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = { + if (autoPick && canUseKryo(ct)) { kryoSerializer } else { defaultSerializer @@ -161,7 +164,8 @@ private[spark] class SerializerManager( outputStream: OutputStream, values: Iterator[T]): Unit = { val byteStream = new BufferedOutputStream(outputStream) - val ser = getSerializer(implicitly[ClassTag[T]]).newInstance() + val autoPick = !blockId.isInstanceOf[StreamBlockId] + val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance() ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close() } @@ -177,7 +181,8 @@ private[spark] class SerializerManager( classTag: ClassTag[_]): ChunkedByteBuffer = { val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate) val byteStream = new BufferedOutputStream(bbos) - val ser = getSerializer(classTag).newInstance() + val autoPick = !blockId.isInstanceOf[StreamBlockId] + val ser = getSerializer(classTag, autoPick).newInstance() ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close() bbos.toChunkedByteBuffer } @@ -191,7 +196,8 @@ private[spark] class SerializerManager( inputStream: InputStream) (classTag: ClassTag[T]): Iterator[T] = { val stream = new BufferedInputStream(inputStream) - getSerializer(classTag) + val autoPick = !blockId.isInstanceOf[StreamBlockId] + getSerializer(classTag, autoPick) .newInstance() .deserializeStream(wrapStream(blockId, stream)) .asIterator.asInstanceOf[Iterator[T]] diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 095d32407f345c0cd920dbd755ee73ff6f0aee0a..fff21218b1769f5ff66f6bf86a828b40fd70e701 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} -import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel} +import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId} import org.apache.spark.unsafe.Platform import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector @@ -334,7 +334,8 @@ private[spark] class MemoryStore( val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator) redirectableStream.setOutputStream(bbos) val serializationStream: SerializationStream = { - val ser = serializerManager.getSerializer(classTag).newInstance() + val autoPick = !blockId.isInstanceOf[StreamBlockId] + val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() ser.serializeStream(serializerManager.wrapStream(blockId, redirectableStream)) } diff --git a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala index ec4f2637fadd00f28a089dadb2758294471569cf..3050f9a2502350b5dae79c4382a3c35c74d6909e 100644 --- a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala @@ -67,7 +67,8 @@ class PartiallySerializedBlockSuite spy } - val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance() + val serializer = serializerManager + .getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance() val redirectableOutputStream = Mockito.spy(new RedirectableOutputStream) redirectableOutputStream.setOutputStream(bbos) val serializationStream = Mockito.spy(serializer.serializeStream(redirectableOutputStream)) @@ -182,7 +183,8 @@ class PartiallySerializedBlockSuite Mockito.verifyNoMoreInteractions(memoryStore) Mockito.verify(partiallySerializedBlock.getUnrolledChunkedByteBuffer, atLeastOnce).dispose() - val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance() + val serializer = serializerManager + .getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance() val deserialized = serializer.deserializeStream(new ByteBufferInputStream(bbos.toByteBuffer)).asIterator.toSeq assert(deserialized === items) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f1482e5c06cdca83c002d5292a5bff1e1af9fc14..45d8f50853431e2175a184db129cfdf75e3f0b41 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -806,6 +806,28 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc.stop() } + test("SPARK-18560 Receiver data should be deserialized properly.") { + // Start a two nodes cluster, so receiver will use one node, and Spark jobs will use the + // other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560. + val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName) + ssc = new StreamingContext(conf, Milliseconds(100)) + val input = ssc.receiverStream(new FakeByteArrayReceiver) + input.count().foreachRDD { rdd => + // Make sure we can read from BlockRDD + if (rdd.collect().headOption.getOrElse(0L) > 0) { + // Stop StreamingContext to unblock "awaitTerminationOrTimeout" + new Thread() { + setDaemon(true) + override def run(): Unit = { + ssc.stop(stopSparkContext = true, stopGracefully = false) + } + }.start() + } + } + ssc.start() + ssc.awaitTerminationOrTimeout(60000) + } + def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => 1 to i) val inputStream = new TestInputStream(s, input, 1) @@ -869,6 +891,31 @@ object TestReceiver { val counter = new AtomicInteger(1) } +class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging { + + val data: Array[Byte] = "test".getBytes + var receivingThreadOption: Option[Thread] = None + + override def onStart(): Unit = { + val thread = new Thread() { + override def run() { + logInfo("Receiving started") + while (!isStopped) { + store(data) + } + logInfo("Receiving stopped") + } + } + receivingThreadOption = Some(thread) + thread.start() + } + + override def onStop(): Unit = { + // no clean to be done, the receiving thread should stop on it own, so just wait for it. + receivingThreadOption.foreach(_.join()) + } +} + /** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {