diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 72123f22325324b80aae910b9820d0c888d6150b..110106303262c418185b5dbe7405fe86107c0d72 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -70,6 +70,8 @@ class SparkEnv ( val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging { + var currentStage: Int = -1 + private[spark] var isStopped = false private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 9b62e4b1b71500c2f8c6e9e936bf5a9df0f4b3eb..852666260ead228b041e30dc436c7bae48e83a74 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -95,6 +95,12 @@ private[spark] class CoarseGrainedExecutorBackend( } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) + // SS { + val currentStageId = taskDesc.name.substring(taskDesc.name.lastIndexOf(' ') + 1, taskDesc.name.lastIndexOf('.')).toInt + // logEarne("Current in Stage: " + currentStageId) + env.currentStage = currentStageId + env.blockManager.currentStage = currentStageId + // SS } executor.launchTask(this, taskDesc) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2c3a8ef74800b64a2958a00d2f481cf040a38775..5738d15627c527ed5084c19a1e2ad948275ddac3 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -326,6 +326,14 @@ private[spark] class Executor( throw new TaskKilledException(killReason.get) } + // SS { + // logEarne("Task " + taskId + " is in Stage " + task.stageId + " and the depMap is " + task.depMap) + if (!env.blockManager.stageExInfos.contains(task.stageId)) { + env.blockManager.stageExInfos.put(task.stageId, + new StageExInfo(task.stageId, null, null, task.depMap, task.curRunningRddMap)) + } + // SS } + // The purpose of updating the epoch here is to invalidate executor map output status cache // in case FetchFailures have occurred. In local mode `env.mapOutputTracker` will be // MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0574abdca32ac1e428fe26c9d8764534c5923c2e..9f3e3273f9461f96d951f0a314594e7451c73d0f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -214,6 +214,7 @@ abstract class RDD[T: ClassTag]( */ def unpersist(blocking: Boolean = true): this.type = { logInfo("Removing RDD " + id + " from persistence list") + sc.dagScheduler.renewDepMap(id) sc.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE this @@ -313,6 +314,34 @@ abstract class RDD[T: ClassTag]( ancestors.filterNot(_ == this).toSeq } + // SS { + /** + * Return cached ancestor ids --- clone of above function. + */ + private[spark] def getNarrowCachedAncestors: Set[Int] = { + val cachedAncestors = new mutable.HashSet[Int] + val ancestors = new mutable.HashSet[RDD[_]] + def visit(rdd: RDD[_]): Unit = { + val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]]) + val narrowParents = narrowDependencies.map(_.rdd) + val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains) + narrowParentsNotVisited.foreach { parent => + ancestors.add(parent) + if (parent.getStorageLevel != StorageLevel.NONE) { + cachedAncestors.add(parent.id) + } else { + visit(parent) + } + } + } + + visit(this) + + cachedAncestors.filterNot(_ == this.id).toSet + } + // SS } + + /** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ @@ -331,6 +360,41 @@ abstract class RDD[T: ClassTag]( private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { val blockId = RDDBlockId(id, partition.index) var readCachedBlock = true + // SS { + val blockManager = SparkEnv.get.blockManager + + if (!blockManager.blockExInfo.containsKey(key)) { + blockManager.blockExInfo.put(key, new BlockExInfo(key)) + } + + blockManager.stageExInfos.get(blockManager.currentStage) match { + case Some(curStageExInfo) => + var parExist = true + for (par <- curStageExInfo.depMap(rdd.id)) { + val parBlockId = new RDDBlockId(par, partition.index) + if (blockManager.blockExInfo.containsKey(parBlockId) && + blockManager.blockExInfo.get(parBlockId).isExist + == 1) { + // par is exist + + } else { + // par not exist now, add this key to it's par's watching set + parExist = false + if (!blockManager.blockExInfo.containsKey(parBlockId)) { + blockManager.blockExInfo.put(parBlockId, new BlockExInfo(parBlockId)) + } + blockManager.blockExInfo.get(parBlockId).sonSet += key + } + } + if (parExist) { + // par are all exist so we update this rdd's start time + // logEarne("par all exist, store start time of " + key) + blockManager.blockExInfo.get(key).creatStartTime = System.currentTimeMillis() + } + case None => + //logEarne("Some Thing Wrong") + } + // SS } // This method is called on executors, so we need call SparkEnv.get instead of sc.env. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { readCachedBlock = false diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8c46a84323392ca52f7f581d417f907e4a9b2270..77bed0cc62c49ba053c1320b16a25df834f800b5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -162,6 +162,15 @@ class DAGScheduler( private[scheduler] val activeJobs = new HashSet[ActiveJob] + // SS { + // Stores data on RDD dependencies for better caching algorithms + private[scheduler] var preRDDs = new HashSet[RDD[_]] + + private[scheduler] var depMap = new HashMap[Int, Set[Int]] + + private[scheduler] var curRunningRddMap = new HashMap[Int, Set[Int]] + // SS } + /** * Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids * and its values are arrays indexed by partition numbers. Each array value is the set of @@ -944,6 +953,31 @@ class DAGScheduler( logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") + // SS { + // track dependency information when we submit a stage + env.blockManager.currentStage = stage.id + // logEarne("In " + stage + " we have cachedRDD " + sc.getPersistentRDDs) + // logEarne(stage + " has narrowAncestors " + stage.rdd.getNarrowAncestors) + // logEarne("Before " + stage + " we have RDDs as below: " + preRDDs) + val curRDDs = stage.rdd.getNarrowAncestors ++ Seq(stage.rdd) + val newRDDs = curRDDs.filter(!preRDDs.contains(_)) + // logEarne("Stage " + stage + " will create RDD as below: " + newRDDs) + val newCachedRDDs = newRDDs.filter(_.getStorageLevel != StorageLevel.NONE) + // logEarne("Stage " + stage + " have newCachedRDDs as below: " + newCachedRDDs) + curRunningRddMap.clear() + newCachedRDDs.foreach { cachedRdd => + depMap.put(cachedRdd.id, cachedRdd.getNarrowCachedAncestors) + curRunningRddMap.put(cachedRdd.id, cachedRdd.getNarrowCachedAncestors) + logEarne("this cachedRDD " + cachedRdd + " have below cachedAncestorRDDs: " + + cachedRdd.getNarrowCachedAncestors) + } + preRDDs = preRDDs ++ curRDDs + // logEarne("Current depMap is" + depMap) + if (!env.blockManager.stageExInfos.contains(stage.id)) { + env.blockManager.stageExInfos.put(stage.id, + new StageExInfo(stage.id, null, null, depMap.clone(), curRunningRddMap)) + } + // SS } submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { @@ -957,6 +991,25 @@ class DAGScheduler( } } + // SS { + /** Renew the dependency map when an RDD is unpersisted */ + def renewDepMap(id: Int): Unit = { + if (depMap.contains(id)) { + // logEarne("Remove RDD " + id + " from depMap") + val value = depMap(id) + depMap.foreach { rdd => + if (rdd._2.contains(id)) { + val tmp = rdd._2 - id + depMap.put(rdd._1, tmp ++ value) + } + } + depMap.remove(id) + // logEarne("After Removed RDD " + id + " the depMap is " + depMap) + } + } + // SS } + + /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") @@ -1060,9 +1113,11 @@ class DAGScheduler( val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id + // SS { new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), - Option(sc.applicationId), sc.applicationAttemptId) + Option(sc.applicationId), sc.applicationAttemptId, depMap, curRunningRddMap) + // SS } } case stage: ResultStage => @@ -1070,9 +1125,11 @@ class DAGScheduler( val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) + // SS { new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, - Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) + Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, depMap, curRunningRddMap) + // SS } } } } catch { diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 4cc5bcb7f9bafa600a8fa42b87c723dffbddfa09..c5fdada55a44e5b48a03d0036ba06664e2b8b16a 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -463,6 +463,25 @@ private[spark] class MemoryStore( } } + + blockInfoManager.blockManager.inMemBlockExInfo.synchronized { + val bxi_iterator = blockManager.inMemBlockExInfo.iterator() + while (freedMemory < space && bxi_iterator.hasNext) { + val bxi = iterator.next() + if (blockIsEvictable(bxi.blockId,entries( + blockManager.stageExInfos.get(blockManager.currentStage) match { + case Some(curStageExInfo) => + // cur is this stage's output RDD + if (!curStageExInfo.curRunningRddMap.contains(bxi.blockId.getRddId)) { + selectedBlocks += bxi.blockId + freedMemory += bxi.size + } + case None => + // logEarne("ERROR HERE") + } + } + } + def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { val data = entry match { case DeserializedMemoryEntry(values, _, _) => Left(values) diff --git a/updates b/updates new file mode 100755 index 0000000000000000000000000000000000000000..9df2bcf24a825cee862494a6380532dc98f0bf57 --- /dev/null +++ b/updates @@ -0,0 +1,30 @@ +Deleted +==================================================== +spark/CacheManager :: relevant function (getOrCompute) moved into spark/rdd/RDD + +Moved: +==================================================== +spark/Logging -> spark/internal/Logging :: added (logEarne) function which is just logging +spark/storage/MemoryStore -> spark/storage/memory/MemoryStore :: modified tryToPut, ensureFreeSpace functions + 1. tryToPut was removed, subdivided between + MemoryManager.acquireStorageMemory and entries.synchronized - entries.put + 2. ensureFreeSpace was renamed to evictBlocksToFreeSpace + +Modified: +==================================================== +spark/SparkEnv :: added currentStage variable +spark/scheduler/DAGScheduler :: added preRDDs, depMap, curRunningRddMap --- based on narrowancestors and narrowcachedancestors, added renewDepMap function, added args to ShuffleMapTask/ResultTask +spark/scheduler/ResultTask :: modified primary ctor to add argument +spark/scheduler/ShuffleMapTask :: modified primary/aux ctor to add arguemnt +spark/scheduler/Task :: modified primary/aux ctor to add arguemnt +spark/rdd/RDD :: modified unpersist, added getNarrowCachedAncestors +spark/executor/Executor :: for each new stage, store a new StageExInfo object in the environment in stageExInfos +spark/executor/CoarseGrainedExecutorBackend :: set SparkEnv, BlockManager currentStage +spark/storage/BlockId :: added getRddId, getRddSplitIndex functions +spark/storage/BlockManager :: added blockExInfoMap, inMemBlockExInfo set, stageExInfos Map, + plus extra logic in removeRDD --- duh, this is because this is where the eviction happens! + +Created: +==================================================== +spark/StageExInfo :: stores info related to a stage, i.e. alreadyPerRddSet, afterPerRddSet, depMap, curRunningRddMap +spark/storage/BlockExInfo :: stores time to create block here, estimates de/serialization time