diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index f350784378795b852255fdab797ad5c92d79daa7..22d01c47e645d034c02fedf8ab49e6e7958d216f 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -207,11 +207,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) TorrentBroadcast.synchronized { setConf(SparkEnv.get.conf) val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId).map(_.data.next()) match { - case Some(x) => - releaseLock(broadcastId) - x.asInstanceOf[T] - + blockManager.getLocalValues(broadcastId) match { + case Some(blockResult) => + if (blockResult.data.hasNext) { + val x = blockResult.data.next().asInstanceOf[T] + releaseLock(broadcastId) + x + } else { + throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") + } case None => logInfo("Started reading broadcast variable " + id) val startTimeMs = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index c08275c7e00e71f43d433be680df27c819099e99..fb54dd66a39a90cc129ce0dd0c2d4ab312838e22 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -702,7 +702,7 @@ private[storage] class PartiallyUnrolledIterator[T]( } override def next(): T = { - if (unrolled == null) { + if (unrolled == null || !unrolled.hasNext) { rest.next() } else { unrolled.next() diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 973676398ae54f02ad94b25eca73cc6b1b87a584..6646068d5080ba89d7b27451ec296c5c8eca7fa6 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -137,6 +137,18 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("Cache broadcast to disk") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set("spark.memory.useLegacyMode", "true") + .set("spark.storage.memoryFraction", "0.0") + sc = new SparkContext(conf) + val list = List[Int](1, 2, 3, 4) + val broadcast = sc.broadcast(list) + assert(broadcast.value.sum === 10) + } + /** * Verify the persistence of state associated with a TorrentBroadcast in a local-cluster. *