diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index c3b770a42cba0df544d29bcb88ec11465a019039..ccd383396460254d5ba433d0c654004f8a5d5a23 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -28,7 +28,6 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PipedRDD -import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -271,16 +270,6 @@ private[spark] object PythonRDD { JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } - /** - * Returns the StorageLevel with the given string name. - * Throws an exception if the name is not a valid StorageLevel. - */ - def getStorageLevelByName(name: String) : StorageLevel = { - // In Scala, "val MEMORY_ONLY" produces a public getter by the same name. - val storageLevelGetter = StorageLevel.getClass().getDeclaredMethod(name) - return storageLevelGetter.invoke(StorageLevel).asInstanceOf[StorageLevel] - } - def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) { import scala.collection.JavaConverters._ writeIteratorToPickleFile(items.asScala, filename) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 514d56e2003103ac7646753145bcbca8905e6ea8..4c48cd3f3786467527385195032c2d02c597a1b7 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -47,6 +47,7 @@ class SparkContext(object): _active_spark_context = None _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH + StorageLevel = None def __init__(self, master, jobName, sparkHome=None, pyFiles=None, environment=None, batchSize=1024): @@ -118,6 +119,29 @@ class SparkContext(object): self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() + self._initStorageLevel() + + def _initStorageLevel(self): + """ + Initializes the StorageLevel object, which mimics the behavior of the scala object + by the same name. e.g., StorageLevel.DISK_ONLY returns the equivalent Java StorageLevel. + """ + newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel + levels = { + 'NONE': newStorageLevel(False, False, False, 1), + 'DISK_ONLY': newStorageLevel(True, False, False, 1), + 'DISK_ONLY_2': newStorageLevel(True, False, False, 2), + 'MEMORY_ONLY': newStorageLevel(False, True, True, 1), + 'MEMORY_ONLY_2': newStorageLevel(False, True, True, 2), + 'MEMORY_ONLY_SER': newStorageLevel(False, True, False, 1), + 'MEMORY_ONLY_SER_2': newStorageLevel(False, True, False, 2), + 'MEMORY_AND_DISK': newStorageLevel(True, True, True, 1), + 'MEMORY_AND_DISK_2': newStorageLevel(True, True, True, 2), + 'MEMORY_AND_DISK_SER': newStorageLevel(True, True, False, 1), + 'MEMORY_AND_DISK_SER_2': newStorageLevel(True, True, False, 2), + } + self.StorageLevel = type('StorageLevel', (), levels) + @property def defaultParallelism(self): """ @@ -279,28 +303,6 @@ class SparkContext(object): """ self._jsc.sc().setCheckpointDir(dirName, useExisting) -class StorageLevelReader: - """ - Mimics the Scala StorageLevel by delegating all attribute requests - (e.g., StorageLevel.DISK_ONLY) to the JVM for reflection. - Memoizes results to reduce JVM call/memory overheads. - """ - - def __init__(self, sc): - self.sc = sc - self.memoized = {} - - def __getattr__(self, name): - if name in self.memoized: - return self.memoized[name] - - try: - storageLevel = self.sc._jvm.PythonRDD.getStorageLevelByName(name) - self.memoized[name] = storageLevel - return storageLevel - except: - print "Failed to find StorageLevel:", name - def _test(): import atexit import doctest diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 9acc176d550091816c6f845a63684672f1e9c81b..e374ca4ee4028a79caaa0683833c9a19a3342bde 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -23,13 +23,13 @@ This file is designed to be launched as a PYTHONSTARTUP script. import os import platform import pyspark -from pyspark.context import SparkContext, StorageLevelReader +from pyspark.context import SparkContext # this is the equivalent of ADD_JARS add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None sc = SparkContext(os.environ.get("MASTER", "local"), "PySparkShell", pyFiles=add_files) -StorageLevel = StorageLevelReader(sc) +StorageLevel = sc.StorageLevel # alias StorageLevel to global scope print """Welcome to ____ __