From c2a01b8ca6581760d01d72ffe74770531918a015 Mon Sep 17 00:00:00 2001
From: Stephen Skeirik <skeirik2@illinois.edu>
Date: Sun, 1 Apr 2018 06:12:44 -0500
Subject: [PATCH] working on algorithm

---
 .../scala/org/apache/spark/SparkEnv.scala     |  2 +
 .../CoarseGrainedExecutorBackend.scala        |  6 ++
 .../org/apache/spark/executor/Executor.scala  |  8 +++
 .../main/scala/org/apache/spark/rdd/RDD.scala | 64 +++++++++++++++++++
 .../apache/spark/scheduler/DAGScheduler.scala | 61 +++++++++++++++++-
 .../spark/storage/memory/MemoryStore.scala    | 19 ++++++
 updates                                       | 30 +++++++++
 7 files changed, 188 insertions(+), 2 deletions(-)
 create mode 100755 updates

diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 72123f2232..1101063032 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -70,6 +70,8 @@ class SparkEnv (
     val outputCommitCoordinator: OutputCommitCoordinator,
     val conf: SparkConf) extends Logging {
 
+  var currentStage: Int = -1
+
   private[spark] var isStopped = false
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 9b62e4b1b7..852666260e 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -95,6 +95,12 @@ private[spark] class CoarseGrainedExecutorBackend(
       } else {
         val taskDesc = TaskDescription.decode(data.value)
         logInfo("Got assigned task " + taskDesc.taskId)
+        // SS {
+        val currentStageId = taskDesc.name.substring(taskDesc.name.lastIndexOf(' ') + 1, taskDesc.name.lastIndexOf('.')).toInt
+        // logEarne("Current in Stage: " + currentStageId)
+        env.currentStage = currentStageId
+        env.blockManager.currentStage = currentStageId
+        // SS }
         executor.launchTask(this, taskDesc)
       }
 
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2c3a8ef748..5738d15627 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -326,6 +326,14 @@ private[spark] class Executor(
           throw new TaskKilledException(killReason.get)
         }
 
+        // SS {
+        // logEarne("Task " + taskId + " is in Stage " + task.stageId + " and the depMap is " + task.depMap)
+        if (!env.blockManager.stageExInfos.contains(task.stageId)) {
+          env.blockManager.stageExInfos.put(task.stageId,
+            new StageExInfo(task.stageId, null, null, task.depMap, task.curRunningRddMap))
+        }
+        // SS }
+
         // The purpose of updating the epoch here is to invalidate executor map output status cache
         // in case FetchFailures have occurred. In local mode `env.mapOutputTracker` will be
         // MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0574abdca3..9f3e3273f9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -214,6 +214,7 @@ abstract class RDD[T: ClassTag](
    */
   def unpersist(blocking: Boolean = true): this.type = {
     logInfo("Removing RDD " + id + " from persistence list")
+    sc.dagScheduler.renewDepMap(id)
     sc.unpersistRDD(id, blocking)
     storageLevel = StorageLevel.NONE
     this
@@ -313,6 +314,34 @@ abstract class RDD[T: ClassTag](
     ancestors.filterNot(_ == this).toSeq
   }
 
+  // SS {
+  /**
+   * Return cached ancestor ids --- clone of above function.
+   */
+  private[spark] def getNarrowCachedAncestors: Set[Int] = {
+    val cachedAncestors = new mutable.HashSet[Int]
+    val ancestors = new mutable.HashSet[RDD[_]]
+    def visit(rdd: RDD[_]): Unit = {
+      val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
+      val narrowParents = narrowDependencies.map(_.rdd)
+      val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
+      narrowParentsNotVisited.foreach { parent =>
+        ancestors.add(parent)
+        if (parent.getStorageLevel != StorageLevel.NONE) {
+          cachedAncestors.add(parent.id)
+        } else {
+          visit(parent)
+        }
+      }
+    }
+
+    visit(this)
+
+    cachedAncestors.filterNot(_ == this.id).toSet
+  }
+  // SS }
+
+
   /**
    * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
    */
@@ -331,6 +360,41 @@ abstract class RDD[T: ClassTag](
   private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
     val blockId = RDDBlockId(id, partition.index)
     var readCachedBlock = true
+    // SS {
+    val blockManager = SparkEnv.get.blockManager
+
+    if (!blockManager.blockExInfo.containsKey(key)) {
+      blockManager.blockExInfo.put(key, new BlockExInfo(key))
+    }
+
+    blockManager.stageExInfos.get(blockManager.currentStage) match {
+      case Some(curStageExInfo) =>
+        var parExist = true
+        for (par <- curStageExInfo.depMap(rdd.id)) {
+          val parBlockId = new RDDBlockId(par, partition.index)
+          if (blockManager.blockExInfo.containsKey(parBlockId) &&
+            blockManager.blockExInfo.get(parBlockId).isExist
+              == 1) {
+            // par is exist
+
+          } else {
+            // par not exist now, add this key to it's par's watching set
+            parExist = false
+            if (!blockManager.blockExInfo.containsKey(parBlockId)) {
+              blockManager.blockExInfo.put(parBlockId, new BlockExInfo(parBlockId))
+            }
+            blockManager.blockExInfo.get(parBlockId).sonSet += key
+          }
+        }
+        if (parExist) {
+          // par are all exist so we update this rdd's start time
+          // logEarne("par all exist, store start time of " + key)
+          blockManager.blockExInfo.get(key).creatStartTime = System.currentTimeMillis()
+        }
+      case None =>
+        //logEarne("Some Thing Wrong")
+    }
+    // SS }
     // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
     SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
       readCachedBlock = false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8c46a84323..77bed0cc62 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -162,6 +162,15 @@ class DAGScheduler(
 
   private[scheduler] val activeJobs = new HashSet[ActiveJob]
 
+  // SS {
+  // Stores data on RDD dependencies for better caching algorithms
+  private[scheduler] var preRDDs = new HashSet[RDD[_]]
+
+  private[scheduler] var depMap = new HashMap[Int, Set[Int]]
+
+  private[scheduler] var curRunningRddMap = new HashMap[Int, Set[Int]]
+  // SS }
+
   /**
    * Contains the locations that each RDD's partitions are cached on.  This map's keys are RDD ids
    * and its values are arrays indexed by partition numbers. Each array value is the set of
@@ -944,6 +953,31 @@ class DAGScheduler(
         logDebug("missing: " + missing)
         if (missing.isEmpty) {
           logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
+          // SS {
+          // track dependency information when we submit a stage
+          env.blockManager.currentStage = stage.id
+          // logEarne("In " + stage + " we have cachedRDD " + sc.getPersistentRDDs)
+          // logEarne(stage + " has narrowAncestors " + stage.rdd.getNarrowAncestors)
+          // logEarne("Before " + stage + " we have RDDs as below: " + preRDDs)
+          val curRDDs = stage.rdd.getNarrowAncestors ++ Seq(stage.rdd)
+          val newRDDs = curRDDs.filter(!preRDDs.contains(_))
+          // logEarne("Stage " + stage + " will create RDD as below: " + newRDDs)
+          val newCachedRDDs = newRDDs.filter(_.getStorageLevel != StorageLevel.NONE)
+          // logEarne("Stage " + stage + " have newCachedRDDs as below: " + newCachedRDDs)
+          curRunningRddMap.clear()
+          newCachedRDDs.foreach { cachedRdd =>
+            depMap.put(cachedRdd.id, cachedRdd.getNarrowCachedAncestors)
+            curRunningRddMap.put(cachedRdd.id, cachedRdd.getNarrowCachedAncestors)
+            logEarne("this cachedRDD " + cachedRdd + " have below cachedAncestorRDDs: "
+              + cachedRdd.getNarrowCachedAncestors)
+          }
+          preRDDs = preRDDs ++ curRDDs
+          // logEarne("Current depMap is" + depMap)
+          if (!env.blockManager.stageExInfos.contains(stage.id)) {
+            env.blockManager.stageExInfos.put(stage.id,
+              new StageExInfo(stage.id, null, null, depMap.clone(), curRunningRddMap))
+          }
+          // SS }
           submitMissingTasks(stage, jobId.get)
         } else {
           for (parent <- missing) {
@@ -957,6 +991,25 @@ class DAGScheduler(
     }
   }
 
+  // SS {
+  /** Renew the dependency map when an RDD is unpersisted */
+  def renewDepMap(id: Int): Unit = {
+    if (depMap.contains(id)) {
+      // logEarne("Remove RDD " + id + " from depMap")
+      val value = depMap(id)
+      depMap.foreach { rdd =>
+        if (rdd._2.contains(id)) {
+          val tmp = rdd._2 - id
+          depMap.put(rdd._1, tmp ++ value)
+        }
+      }
+      depMap.remove(id)
+      // logEarne("After Removed RDD " + id + " the depMap is " + depMap)
+    }
+  }
+  // SS }
+
+
   /** Called when stage's parents are available and we can now do its task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
     logDebug("submitMissingTasks(" + stage + ")")
@@ -1060,9 +1113,11 @@ class DAGScheduler(
             val locs = taskIdToLocations(id)
             val part = partitions(id)
             stage.pendingPartitions += id
+            // SS {
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
               taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
-              Option(sc.applicationId), sc.applicationAttemptId)
+              Option(sc.applicationId), sc.applicationAttemptId, depMap, curRunningRddMap)
+            // SS }
           }
 
         case stage: ResultStage =>
@@ -1070,9 +1125,11 @@ class DAGScheduler(
             val p: Int = stage.partitions(id)
             val part = partitions(p)
             val locs = taskIdToLocations(id)
+            // SS {
             new ResultTask(stage.id, stage.latestInfo.attemptNumber,
               taskBinary, part, locs, id, properties, serializedTaskMetrics,
-              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
+              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, depMap, curRunningRddMap)
+            // SS }
           }
       }
     } catch {
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 4cc5bcb7f9..c5fdada55a 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -463,6 +463,25 @@ private[spark] class MemoryStore(
         }
       }
 
+
+      blockInfoManager.blockManager.inMemBlockExInfo.synchronized {
+        val bxi_iterator = blockManager.inMemBlockExInfo.iterator()
+        while (freedMemory < space &&  bxi_iterator.hasNext) {
+          val bxi = iterator.next()
+          if (blockIsEvictable(bxi.blockId,entries(
+          blockManager.stageExInfos.get(blockManager.currentStage) match {
+            case Some(curStageExInfo) =>
+              // cur is this stage's output RDD
+              if (!curStageExInfo.curRunningRddMap.contains(bxi.blockId.getRddId)) {
+                selectedBlocks += bxi.blockId
+                freedMemory += bxi.size
+              }
+            case None =>
+              // logEarne("ERROR HERE")
+          }
+        }
+      }
+
       def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
         val data = entry match {
           case DeserializedMemoryEntry(values, _, _) => Left(values)
diff --git a/updates b/updates
new file mode 100755
index 0000000000..9df2bcf24a
--- /dev/null
+++ b/updates
@@ -0,0 +1,30 @@
+Deleted
+====================================================
+spark/CacheManager 							:: relevant function (getOrCompute) moved into spark/rdd/RDD
+
+Moved:
+====================================================
+spark/Logging             -> spark/internal/Logging			:: added (logEarne) function which is just logging
+spark/storage/MemoryStore -> spark/storage/memory/MemoryStore		:: modified tryToPut, ensureFreeSpace functions
+                                                                           1. tryToPut was removed, subdivided between
+									      MemoryManager.acquireStorageMemory and entries.synchronized - entries.put
+									   2. ensureFreeSpace was renamed to evictBlocksToFreeSpace
+
+Modified:
+====================================================
+spark/SparkEnv 								:: added currentStage variable
+spark/scheduler/DAGScheduler						:: added preRDDs, depMap, curRunningRddMap --- based on narrowancestors and narrowcachedancestors, added renewDepMap function, added args to ShuffleMapTask/ResultTask
+spark/scheduler/ResultTask						:: modified primary ctor to add argument
+spark/scheduler/ShuffleMapTask						:: modified primary/aux ctor to add arguemnt
+spark/scheduler/Task							:: modified primary/aux ctor to add arguemnt
+spark/rdd/RDD								:: modified unpersist, added getNarrowCachedAncestors
+spark/executor/Executor							:: for each new stage, store a new StageExInfo object in the environment in stageExInfos
+spark/executor/CoarseGrainedExecutorBackend				:: set SparkEnv, BlockManager currentStage
+spark/storage/BlockId							:: added getRddId, getRddSplitIndex functions
+spark/storage/BlockManager						:: added blockExInfoMap, inMemBlockExInfo set, stageExInfos Map,
+                                                                           plus extra logic in removeRDD --- duh, this is because this is where the eviction happens!
+
+Created:
+====================================================
+spark/StageExInfo							:: stores info related to a stage, i.e. alreadyPerRddSet, afterPerRddSet, depMap, curRunningRddMap
+spark/storage/BlockExInfo						:: stores time to create block here, estimates de/serialization time
-- 
GitLab