From 42dfab7d374cf64a39b692ebc089792a4ff7e42c Mon Sep 17 00:00:00 2001
From: Daoyuan <daoyuan.wang@intel.com>
Date: Thu, 24 Jul 2014 00:09:36 -0700
Subject: [PATCH] [SPARK-2661][bagel]unpersist old processed rdd

Unpersist useless rdd during bagel iteration to make full use of memory.

Author: Daoyuan <daoyuan.wang@intel.com>

Closes #1519 from adrian-wang/bagelunpersist and squashes the following commits:

182c9dd [Daoyuan] rename var nextUseless to lastRDD
87fd3a4 [Daoyuan] bagel unpersist old processed rdd
---
 bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 70a99b33d7..ef0bb2ac13 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -72,6 +72,7 @@ object Bagel extends Logging {
     var verts = vertices
     var msgs = messages
     var noActivity = false
+    var lastRDD: RDD[(K, (V, Array[M]))] = null
     do {
       logInfo("Starting superstep " + superstep + ".")
       val startTime = System.currentTimeMillis
@@ -83,6 +84,10 @@ object Bagel extends Logging {
       val superstep_ = superstep  // Create a read-only copy of superstep for capture in closure
       val (processed, numMsgs, numActiveVerts) =
         comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
+      if (lastRDD != null) {
+        lastRDD.unpersist(false)
+      }
+      lastRDD = processed
 
       val timeTaken = System.currentTimeMillis - startTime
       logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
-- 
GitLab