From d8b06f18dc3e35938d15099beac98221d6f528b5 Mon Sep 17 00:00:00 2001
From: Eric Liang <ekl@databricks.com>
Date: Fri, 8 Jul 2016 20:18:49 -0700
Subject: [PATCH] [SPARK-16432] Empty blocks fail to serialize due to assert in
 ChunkedByteBuffer

## What changes were proposed in this pull request?

It's possible to also change the callers to not pass in empty chunks, but it seems cleaner to just allow `ChunkedByteBuffer` to handle empty arrays. cc JoshRosen

## How was this patch tested?

Unit tests, also checked that the original reproduction case in https://github.com/apache/spark/pull/11748#issuecomment-230760283 is resolved.

Author: Eric Liang <ekl@databricks.com>

Closes #14099 from ericl/spark-16432.
---
 .../org/apache/spark/util/io/ChunkedByteBuffer.scala |  9 ++++-----
 .../org/apache/spark/io/ChunkedByteBufferSuite.scala | 12 ++++--------
 2 files changed, 8 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index fb4706e78d..89b0874e38 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -31,14 +31,13 @@ import org.apache.spark.storage.StorageUtils
  * Read-only byte buffer which is physically stored as multiple chunks rather than a single
  * contiguous array.
  *
- * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must be non-empty and have
- *               position == 0. Ownership of these buffers is transferred to the ChunkedByteBuffer,
- *               so if these buffers may also be used elsewhere then the caller is responsible for
- *               copying them as needed.
+ * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must have position == 0.
+ *               Ownership of these buffers is transferred to the ChunkedByteBuffer, so if these
+ *               buffers may also be used elsewhere then the caller is responsible for copying
+ *               them as needed.
  */
 private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
   require(chunks != null, "chunks must not be null")
-  require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
   require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
 
   private[this] var disposed: Boolean = false
diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
index f205d4f0d6..38b48a4c9e 100644
--- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -38,12 +38,6 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
     emptyChunkedByteBuffer.toInputStream(dispose = true).close()
   }
 
-  test("chunks must be non-empty") {
-    intercept[IllegalArgumentException] {
-      new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))
-    }
-  }
-
   test("getChunks() duplicates chunks") {
     val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
     chunkedByteBuffer.getChunks().head.position(4)
@@ -63,8 +57,9 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
   }
 
   test("toArray()") {
+    val empty = ByteBuffer.wrap(Array[Byte]())
     val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
-    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes))
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes, empty))
     assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array())
   }
 
@@ -79,9 +74,10 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
   }
 
   test("toInputStream()") {
+    val empty = ByteBuffer.wrap(Array[Byte]())
     val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
     val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
-    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2))
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(empty, bytes1, bytes2))
     assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit())
 
     val inputStream = chunkedByteBuffer.toInputStream(dispose = false)
-- 
GitLab