Skip to content
Snippets Groups Projects
Commit 8001687a authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Remove reflection, hard-code StorageLevels

The sc.StorageLevel -> StorageLevel pathway is a bit janky, but otherwise
the shell would have to call a private method of SparkContext. Having
StorageLevel available in sc also doesn't seem like the end of the world.
There may be a better solution, though.

As for creating the StorageLevel object itself, this seems to be the best
way in Python 2 for creating singleton, enum-like objects:
http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python
parent b8a0b6ea
No related branches found
No related tags found
No related merge requests found
......@@ -28,7 +28,6 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PipedRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
......@@ -271,16 +270,6 @@ private[spark] object PythonRDD {
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
/**
* Returns the StorageLevel with the given string name.
* Throws an exception if the name is not a valid StorageLevel.
*/
def getStorageLevelByName(name: String) : StorageLevel = {
// In Scala, "val MEMORY_ONLY" produces a public getter by the same name.
val storageLevelGetter = StorageLevel.getClass().getDeclaredMethod(name)
return storageLevelGetter.invoke(StorageLevel).asInstanceOf[StorageLevel]
}
def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) {
import scala.collection.JavaConverters._
writeIteratorToPickleFile(items.asScala, filename)
......
......@@ -47,6 +47,7 @@ class SparkContext(object):
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
StorageLevel = None
def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024):
......@@ -118,6 +119,29 @@ class SparkContext(object):
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
self._initStorageLevel()
def _initStorageLevel(self):
"""
Initializes the StorageLevel object, which mimics the behavior of the scala object
by the same name. e.g., StorageLevel.DISK_ONLY returns the equivalent Java StorageLevel.
"""
newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
levels = {
'NONE': newStorageLevel(False, False, False, 1),
'DISK_ONLY': newStorageLevel(True, False, False, 1),
'DISK_ONLY_2': newStorageLevel(True, False, False, 2),
'MEMORY_ONLY': newStorageLevel(False, True, True, 1),
'MEMORY_ONLY_2': newStorageLevel(False, True, True, 2),
'MEMORY_ONLY_SER': newStorageLevel(False, True, False, 1),
'MEMORY_ONLY_SER_2': newStorageLevel(False, True, False, 2),
'MEMORY_AND_DISK': newStorageLevel(True, True, True, 1),
'MEMORY_AND_DISK_2': newStorageLevel(True, True, True, 2),
'MEMORY_AND_DISK_SER': newStorageLevel(True, True, False, 1),
'MEMORY_AND_DISK_SER_2': newStorageLevel(True, True, False, 2),
}
self.StorageLevel = type('StorageLevel', (), levels)
@property
def defaultParallelism(self):
"""
......@@ -279,28 +303,6 @@ class SparkContext(object):
"""
self._jsc.sc().setCheckpointDir(dirName, useExisting)
class StorageLevelReader:
"""
Mimics the Scala StorageLevel by delegating all attribute requests
(e.g., StorageLevel.DISK_ONLY) to the JVM for reflection.
Memoizes results to reduce JVM call/memory overheads.
"""
def __init__(self, sc):
self.sc = sc
self.memoized = {}
def __getattr__(self, name):
if name in self.memoized:
return self.memoized[name]
try:
storageLevel = self.sc._jvm.PythonRDD.getStorageLevelByName(name)
self.memoized[name] = storageLevel
return storageLevel
except:
print "Failed to find StorageLevel:", name
def _test():
import atexit
import doctest
......
......@@ -23,13 +23,13 @@ This file is designed to be launched as a PYTHONSTARTUP script.
import os
import platform
import pyspark
from pyspark.context import SparkContext, StorageLevelReader
from pyspark.context import SparkContext
# this is the equivalent of ADD_JARS
add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None
sc = SparkContext(os.environ.get("MASTER", "local"), "PySparkShell", pyFiles=add_files)
StorageLevel = StorageLevelReader(sc)
StorageLevel = sc.StorageLevel # alias StorageLevel to global scope
print """Welcome to
____ __
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment