diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 3c2f9c4616fe206ca7ddd7d1e004d9d4b445279e..47829a431e871489b622ae0ea5e050163e0b1427 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -1,10 +1,8 @@
 package spark.bagel
 
 import org.scalatest.{FunSuite, Assertions, BeforeAndAfter}
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -13,7 +11,7 @@ import spark._
 class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
 class TestMessage(val targetId: String) extends Message[String] with Serializable
 
-class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
+class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {
   
   var sc: SparkContext = _
   
@@ -25,7 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
   }
-  
+
   test("halting by voting") {
     sc = new SparkContext("local", "test")
     val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0))))
@@ -36,8 +34,9 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
         (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
           (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
       }
-    for ((id, vert) <- result.collect)
+    for ((id, vert) <- result.collect) {
       assert(vert.age === numSupersteps)
+    }
   }
 
   test("halting by message silence") {
@@ -57,7 +56,27 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
             }
         (new TestVertex(self.active, self.age + 1), msgsOut)
       }
-    for ((id, vert) <- result.collect)
+    for ((id, vert) <- result.collect) {
       assert(vert.age === numSupersteps)
+    }
+  }
+
+  test("large number of iterations") {
+    // This tests whether jobs with a large number of iterations finish in a reasonable time,
+    // because non-memoized recursion in RDD or DAGScheduler used to cause them to hang
+    failAfter(10 seconds) {
+      sc = new SparkContext("local", "test")
+      val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
+      val msgs = sc.parallelize(Array[(String, TestMessage)]())
+      val numSupersteps = 50
+      val result =
+        Bagel.run(sc, verts, msgs, sc.defaultParallelism) {
+          (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
+            (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
+        }
+      for ((id, vert) <- result.collect) {
+        assert(vert.age === numSupersteps)
+      }
+    }
   }
 }
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 6abb5c4792cc24954223c109b08671168b67c8c4..f6e927a989c12636e1a2008a7bc5e73e2fdd7593 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -636,16 +636,22 @@ abstract class RDD[T: ClassManifest](
   /** The [[spark.SparkContext]] that this RDD was created on. */
   def context = sc
 
+  // Avoid handling doCheckpoint multiple times to prevent excessive recursion
+  private var doCheckpointCalled = false
+
   /**
    * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
    * after a job using this RDD has completed (therefore the RDD has been materialized and
    * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
    */
   private[spark] def doCheckpoint() {
-    if (checkpointData.isDefined) {
-      checkpointData.get.doCheckpoint()
-    } else {
-      dependencies.foreach(_.rdd.doCheckpoint())
+    if (!doCheckpointCalled) {
+      doCheckpointCalled = true
+      if (checkpointData.isDefined) {
+        checkpointData.get.doCheckpoint()
+      } else {
+        dependencies.foreach(_.rdd.doCheckpoint())
+      }
     }
   }