diff --git a/dev/lint-python b/dev/lint-python new file mode 100755 index 0000000000000000000000000000000000000000..4efddad8393872b1f2fd123c91fc2bcba137306a --- /dev/null +++ b/dev/lint-python @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )" +SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)" +PEP8_REPORT_PATH="$SPARK_ROOT_DIR/dev/pep8-report.txt" + +cd $SPARK_ROOT_DIR + +# Get pep8 at runtime so that we don't rely on it being installed on the build server. +#+ See: https://github.com/apache/spark/pull/1744#issuecomment-50982162 +#+ TODOs: +#+ - Dynamically determine latest release version of pep8 and use that. +#+ - Download this from a more reliable source. (GitHub raw can be flaky, apparently. (?)) +PEP8_SCRIPT_PATH="$SPARK_ROOT_DIR/dev/pep8.py" +PEP8_SCRIPT_REMOTE_PATH="https://raw.githubusercontent.com/jcrocholl/pep8/1.5.7/pep8.py" + +curl --silent -o "$PEP8_SCRIPT_PATH" "$PEP8_SCRIPT_REMOTE_PATH" +curl_status=$? + +if [ $curl_status -ne 0 ]; then + echo "Failed to download pep8.py from \"$PEP8_SCRIPT_REMOTE_PATH\"." + exit $curl_status +fi + + +# There is no need to write this output to a file +#+ first, but we do so so that the check status can +#+ be output before the report, like with the +#+ scalastyle and RAT checks. +python $PEP8_SCRIPT_PATH ./python > "$PEP8_REPORT_PATH" +pep8_status=${PIPESTATUS[0]} #$? + +if [ $pep8_status -ne 0 ]; then + echo "PEP 8 checks failed." + cat "$PEP8_REPORT_PATH" +else + echo "PEP 8 checks passed." +fi + +rm -f "$PEP8_REPORT_PATH" +rm "$PEP8_SCRIPT_PATH" + +exit $pep8_status diff --git a/dev/lint-scala b/dev/lint-scala new file mode 100755 index 0000000000000000000000000000000000000000..c676dfdf4f44e89df99f426898865920a66b06b2 --- /dev/null +++ b/dev/lint-scala @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )" +SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)" + +"$SCRIPT_DIR/scalastyle" diff --git a/dev/run-tests b/dev/run-tests index d401c90f41d7bad597b702dcd249eb579e93a6fe..0e24515d1376cf789b255c152a29aa3f5338ee76 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -66,16 +66,25 @@ fi set -e set -o pipefail +echo "" echo "=========================================================================" echo "Running Apache RAT checks" echo "=========================================================================" dev/check-license +echo "" echo "=========================================================================" echo "Running Scala style checks" echo "=========================================================================" -dev/scalastyle +dev/lint-scala +echo "" +echo "=========================================================================" +echo "Running Python style checks" +echo "=========================================================================" +dev/lint-python + +echo "" echo "=========================================================================" echo "Running Spark unit tests" echo "=========================================================================" @@ -89,11 +98,13 @@ fi echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS clean package assembly/assembly test | \ grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" +echo "" echo "=========================================================================" echo "Running PySpark tests" echo "=========================================================================" ./python/run-tests +echo "" echo "=========================================================================" echo "Detecting binary incompatibilites with MiMa" echo "=========================================================================" diff --git a/dev/scalastyle b/dev/scalastyle index d9f2b91a3a091a6e8a1ce3946c39c235fabcb19f..b53053a04ff421eefb323241e57de497bb19e175 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -30,5 +30,5 @@ if test ! -z "$ERRORS"; then echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS" exit 1 else - echo -e "Scalastyle checks passed.\n" + echo -e "Scalastyle checks passed." fi diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 45d36e5d0e764223956161101f86e8a4330b9a43..f133cf6f7befc1b91e9b57c5931848d7f489e368 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -110,6 +110,7 @@ def _deserialize_accumulator(aid, zero_value, accum_param): class Accumulator(object): + """ A shared variable that can be accumulated, i.e., has a commutative and associative "add" operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=} @@ -166,6 +167,7 @@ class Accumulator(object): class AccumulatorParam(object): + """ Helper object that defines how to accumulate values of a given type. """ @@ -186,6 +188,7 @@ class AccumulatorParam(object): class AddingAccumulatorParam(AccumulatorParam): + """ An AccumulatorParam that uses the + operators to add values. Designed for simple types such as integers, floats, and lists. Requires the zero value for the underlying type @@ -210,6 +213,7 @@ COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j) class _UpdateRequestHandler(SocketServer.StreamRequestHandler): + """ This handler will keep polling updates from the same socket until the server is shutdown. @@ -228,7 +232,9 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): # Write a byte in acknowledgement self.wfile.write(struct.pack("!b", 1)) + class AccumulatorServer(SocketServer.TCPServer): + """ A simple TCP server that intercepts shutdown() in order to interrupt our continuous polling on the handler. @@ -239,6 +245,7 @@ class AccumulatorServer(SocketServer.TCPServer): self.server_shutdown = True SocketServer.TCPServer.shutdown(self) + def _start_update_server(): """Start a TCP server to receive accumulator updates in a daemon thread, and returns it""" server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 43f40f8783bfd426cb94f031527cc11ee43059c7..f3e64989ed564457007b36c14610a9780b617a21 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -45,6 +45,7 @@ def _from_id(bid): class Broadcast(object): + """ A broadcast variable created with L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}. diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index b4c82f519bd53605c98edfc66b5be42ab0081b30..fb716f6753a45aa5dca04f070e6f553e19d148f5 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -56,6 +56,7 @@ spark.home=/path class SparkConf(object): + """ Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 2e80eb50f2207626590c2c7ddeaf6f8ad93f7e30..4001ecab5ea00439b4eb068695b696c2cc7a9f05 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -47,6 +47,7 @@ DEFAULT_CONFIGS = { class SparkContext(object): + """ Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create L{RDD}s and @@ -213,7 +214,7 @@ class SparkContext(object): if instance: if (SparkContext._active_spark_context and - SparkContext._active_spark_context != instance): + SparkContext._active_spark_context != instance): currentMaster = SparkContext._active_spark_context.master currentAppName = SparkContext._active_spark_context.appName callsite = SparkContext._active_spark_context._callsite @@ -406,7 +407,7 @@ class SparkContext(object): batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, - keyConverter, valueConverter, minSplits, batchSize) + keyConverter, valueConverter, minSplits, batchSize) return RDD(jrdd, self, ser) def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -437,7 +438,8 @@ class SparkContext(object): batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf, batchSize) + valueClass, keyConverter, valueConverter, + jconf, batchSize) return RDD(jrdd, self, ser) def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -465,7 +467,8 @@ class SparkContext(object): batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf, batchSize) + valueClass, keyConverter, valueConverter, + jconf, batchSize) return RDD(jrdd, self, ser) def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -496,7 +499,8 @@ class SparkContext(object): batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf, batchSize) + valueClass, keyConverter, valueConverter, + jconf, batchSize) return RDD(jrdd, self, ser) def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -523,8 +527,9 @@ class SparkContext(object): jconf = self._dictToJavaMap(conf) batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() - jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, - keyConverter, valueConverter, jconf, batchSize) + jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, + jconf, batchSize) return RDD(jrdd, self, ser) def _checkpointFile(self, name, input_deserializer): @@ -555,8 +560,7 @@ class SparkContext(object): first = rdds[0]._jrdd rest = [x._jrdd for x in rdds[1:]] rest = ListConverter().convert(rest, self._gateway._gateway_client) - return RDD(self._jsc.union(first, rest), self, - rdds[0]._jrdd_deserializer) + return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer) def broadcast(self, value): """ @@ -568,8 +572,7 @@ class SparkContext(object): pickleSer = PickleSerializer() pickled = pickleSer.dumps(value) jbroadcast = self._jsc.broadcast(bytearray(pickled)) - return Broadcast(jbroadcast.id(), value, jbroadcast, - self._pickled_broadcast_vars) + return Broadcast(jbroadcast.id(), value, jbroadcast, self._pickled_broadcast_vars) def accumulator(self, value, accum_param=None): """ diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index b00da833d06f1d2af89505694c716014b7bf5c9d..e73538baf0b93b8bb06f1e9e592e9c1940b86ab7 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -43,7 +43,7 @@ def worker(sock): """ # Redirect stdout to stderr os.dup2(2, 1) - sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1 + sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1 signal.signal(SIGHUP, SIG_DFL) signal.signal(SIGCHLD, SIG_DFL) @@ -134,8 +134,7 @@ def manager(): try: os.kill(worker_pid, signal.SIGKILL) except OSError: - pass # process already died - + pass # process already died if listen_sock in ready_fds: sock, addr = listen_sock.accept() diff --git a/python/pyspark/files.py b/python/pyspark/files.py index 57ee14eeb777667fa89518aa4d57c03f87edd0fb..331de9a9b2212be42592d0640716a4a2b7a834d3 100644 --- a/python/pyspark/files.py +++ b/python/pyspark/files.py @@ -19,6 +19,7 @@ import os class SparkFiles(object): + """ Resolves paths to files added through L{SparkContext.addFile()<pyspark.context.SparkContext.addFile>}. diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 2c129679f47f3d78b9664469b7854d1d3283ae66..37386ab0d7d490a2161ee936179fc27a1aac8ff0 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -65,6 +65,7 @@ def launch_gateway(): # Create a thread to echo output from the GatewayServer, which is required # for Java log output to show up: class EchoOutputThread(Thread): + def __init__(self, stream): Thread.__init__(self) self.daemon = True diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 9c1565affbdac8b601ca4c7bd7b2e0bad0805504..db341da85f865a95fa94b813b7ea90a6a87998d7 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -72,9 +72,9 @@ except: # Python interpreter must agree on what endian the machine is. -DENSE_VECTOR_MAGIC = 1 +DENSE_VECTOR_MAGIC = 1 SPARSE_VECTOR_MAGIC = 2 -DENSE_MATRIX_MAGIC = 3 +DENSE_MATRIX_MAGIC = 3 LABELED_POINT_MAGIC = 4 @@ -443,6 +443,7 @@ def _serialize_rating(r): class RatingDeserializer(Serializer): + def loads(self, stream): length = struct.unpack("!i", stream.read(4))[0] ba = stream.read(length) diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 5ec1a8084d2690de84ca92575433cef35ba40e5b..ffdda7ee19302cc85158cec06f8fd6a284b43fd0 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -31,6 +31,7 @@ from math import exp, log class LogisticRegressionModel(LinearModel): + """A linear binary classification model derived from logistic regression. >>> data = [ @@ -60,6 +61,7 @@ class LogisticRegressionModel(LinearModel): >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0 True """ + def predict(self, x): _linear_predictor_typecheck(x, self._coeff) margin = _dot(x, self._coeff) + self._intercept @@ -72,6 +74,7 @@ class LogisticRegressionModel(LinearModel): class LogisticRegressionWithSGD(object): + @classmethod def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None, regParam=1.0, regType=None, intercept=False): @@ -108,6 +111,7 @@ class LogisticRegressionWithSGD(object): class SVMModel(LinearModel): + """A support vector machine. >>> data = [ @@ -131,6 +135,7 @@ class SVMModel(LinearModel): >>> svm.predict(SparseVector(2, {0: -1.0})) <= 0 True """ + def predict(self, x): _linear_predictor_typecheck(x, self._coeff) margin = _dot(x, self._coeff) + self._intercept @@ -138,6 +143,7 @@ class SVMModel(LinearModel): class SVMWithSGD(object): + @classmethod def train(cls, data, iterations=100, step=1.0, regParam=1.0, miniBatchFraction=1.0, initialWeights=None, regType=None, intercept=False): @@ -173,6 +179,7 @@ class SVMWithSGD(object): class NaiveBayesModel(object): + """ Model for Naive Bayes classifiers. @@ -213,6 +220,7 @@ class NaiveBayesModel(object): class NaiveBayes(object): + @classmethod def train(cls, data, lambda_=1.0): """ diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index b380e8f6c8725f0aaadf74fd2f82e93b6aad444a..a0630d1d5c58b6e502f8365b3370f9b64d612ba7 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -27,6 +27,7 @@ from pyspark.mllib.linalg import SparseVector class KMeansModel(object): + """A clustering model derived from the k-means method. >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) @@ -55,6 +56,7 @@ class KMeansModel(object): >>> type(model.clusterCenters) <type 'list'> """ + def __init__(self, centers): self.centers = centers @@ -76,6 +78,7 @@ class KMeansModel(object): class KMeans(object): + @classmethod def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"): """Train a k-means clustering model.""" diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 54720c2324ca669d79e75b77cfb83dec2db8a948..9a239abfbbeb1d6c54ab72aa70f7fc341ec2fba2 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -27,6 +27,7 @@ from numpy import array, array_equal, ndarray, float64, int32 class SparseVector(object): + """ A simple sparse vector class for passing data to MLlib. Users may alternatively pass SciPy's {scipy.sparse} data types. @@ -192,6 +193,7 @@ class SparseVector(object): class Vectors(object): + """ Factory methods for working with vectors. Note that dense vectors are simply represented as NumPy array objects, so there is no need diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index 36e710dbae7a84f025850a89e6ae193094572578..eb496688b6eefc66a4150944c92ff8c481f07be7 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -24,7 +24,9 @@ from pyspark.rdd import RDD from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector from pyspark.serializers import NoOpSerializer + class RandomRDDGenerators: + """ Generator methods for creating RDDs comprised of i.i.d samples from some distribution. @@ -53,7 +55,7 @@ class RandomRDDGenerators: True """ jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) - uniform = RDD(jrdd, sc, NoOpSerializer()) + uniform = RDD(jrdd, sc, NoOpSerializer()) return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) @staticmethod @@ -77,7 +79,7 @@ class RandomRDDGenerators: True """ jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) - normal = RDD(jrdd, sc, NoOpSerializer()) + normal = RDD(jrdd, sc, NoOpSerializer()) return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) @staticmethod @@ -98,7 +100,7 @@ class RandomRDDGenerators: True """ jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) - poisson = RDD(jrdd, sc, NoOpSerializer()) + poisson = RDD(jrdd, sc, NoOpSerializer()) return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) @staticmethod @@ -118,7 +120,7 @@ class RandomRDDGenerators: """ jrdd = sc._jvm.PythonMLLibAPI() \ .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) - uniform = RDD(jrdd, sc, NoOpSerializer()) + uniform = RDD(jrdd, sc, NoOpSerializer()) return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) @staticmethod @@ -138,7 +140,7 @@ class RandomRDDGenerators: """ jrdd = sc._jvm.PythonMLLibAPI() \ .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) - normal = RDD(jrdd, sc, NoOpSerializer()) + normal = RDD(jrdd, sc, NoOpSerializer()) return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) @staticmethod @@ -161,7 +163,7 @@ class RandomRDDGenerators: """ jrdd = sc._jvm.PythonMLLibAPI() \ .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) - poisson = RDD(jrdd, sc, NoOpSerializer()) + poisson = RDD(jrdd, sc, NoOpSerializer()) return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 6c385042ffa5f7433b449ee611eae5fe06048e4d..e863fc249ec3649fcd194c694ba687162f6941b6 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -26,6 +26,7 @@ from pyspark.rdd import RDD class MatrixFactorizationModel(object): + """A matrix factorisation model trained by regularized alternating least-squares. @@ -58,6 +59,7 @@ class MatrixFactorizationModel(object): class ALS(object): + @classmethod def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): sc = ratings.context diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 041b119269427f6aafd906fee7720b01975e8c93..d8792cf44872f005cbcc3b2c09b0541c72843e05 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -27,6 +27,7 @@ from pyspark.mllib.linalg import SparseVector, Vectors class LabeledPoint(object): + """ The features and labels of a data point. @@ -34,6 +35,7 @@ class LabeledPoint(object): @param features: Vector of features for this point (NumPy array, list, pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix) """ + def __init__(self, label, features): self.label = label if (type(features) == ndarray or type(features) == SparseVector @@ -49,7 +51,9 @@ class LabeledPoint(object): class LinearModel(object): + """A linear model that has a vector of coefficients and an intercept.""" + def __init__(self, weights, intercept): self._coeff = weights self._intercept = intercept @@ -64,6 +68,7 @@ class LinearModel(object): class LinearRegressionModelBase(LinearModel): + """A linear regression model. >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1) @@ -72,6 +77,7 @@ class LinearRegressionModelBase(LinearModel): >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6 True """ + def predict(self, x): """Predict the value of the dependent variable given a vector x""" """containing values for the independent variables.""" @@ -80,6 +86,7 @@ class LinearRegressionModelBase(LinearModel): class LinearRegressionModel(LinearRegressionModelBase): + """A linear regression model derived from a least-squares fit. >>> from pyspark.mllib.regression import LabeledPoint @@ -111,6 +118,7 @@ class LinearRegressionModel(LinearRegressionModelBase): class LinearRegressionWithSGD(object): + @classmethod def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None, regParam=1.0, regType=None, intercept=False): @@ -146,6 +154,7 @@ class LinearRegressionWithSGD(object): class LassoModel(LinearRegressionModelBase): + """A linear regression model derived from a least-squares fit with an l_1 penalty term. @@ -178,6 +187,7 @@ class LassoModel(LinearRegressionModelBase): class LassoWithSGD(object): + @classmethod def train(cls, data, iterations=100, step=1.0, regParam=1.0, miniBatchFraction=1.0, initialWeights=None): @@ -189,6 +199,7 @@ class LassoWithSGD(object): class RidgeRegressionModel(LinearRegressionModelBase): + """A linear regression model derived from a least-squares fit with an l_2 penalty term. @@ -221,6 +232,7 @@ class RidgeRegressionModel(LinearRegressionModelBase): class RidgeRegressionWithSGD(object): + @classmethod def train(cls, data, iterations=100, step=1.0, regParam=1.0, miniBatchFraction=1.0, initialWeights=None): diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py index 0a08a562d1f1f31829cda165366d68b82eef3887..982906b9d09f07c3f53439289d639c86cd0a01d3 100644 --- a/python/pyspark/mllib/stat.py +++ b/python/pyspark/mllib/stat.py @@ -24,6 +24,7 @@ from pyspark.mllib._common import \ _serialize_double, _serialize_double_vector, \ _deserialize_double, _deserialize_double_matrix + class Statistics(object): @staticmethod diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 9d1e5be637a9ad195978ed42748b01ce22402aba..6f3ec8ac94bacddc4675e01982fa89fd9d6eeb21 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -39,6 +39,7 @@ except: class VectorTests(unittest.TestCase): + def test_serialize(self): sv = SparseVector(4, {1: 1, 3: 2}) dv = array([1., 2., 3., 4.]) @@ -81,6 +82,7 @@ class VectorTests(unittest.TestCase): class ListTests(PySparkTestCase): + """ Test MLlib algorithms on plain lists, to make sure they're passed through as NumPy arrays. @@ -128,7 +130,7 @@ class ListTests(PySparkTestCase): self.assertTrue(nb_model.predict(features[2]) <= 0) self.assertTrue(nb_model.predict(features[3]) > 0) - categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories + categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories dt_model = \ DecisionTree.trainClassifier(rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo) @@ -168,7 +170,7 @@ class ListTests(PySparkTestCase): self.assertTrue(rr_model.predict(features[2]) <= 0) self.assertTrue(rr_model.predict(features[3]) > 0) - categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories + categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories dt_model = \ DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) self.assertTrue(dt_model.predict(features[0]) <= 0) @@ -179,6 +181,7 @@ class ListTests(PySparkTestCase): @unittest.skipIf(not _have_scipy, "SciPy not installed") class SciPyTests(PySparkTestCase): + """ Test both vector operations and MLlib algorithms with SciPy sparse matrices, if SciPy is available. @@ -276,7 +279,7 @@ class SciPyTests(PySparkTestCase): self.assertTrue(nb_model.predict(features[2]) <= 0) self.assertTrue(nb_model.predict(features[3]) > 0) - categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories + categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories dt_model = DecisionTree.trainClassifier(rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo) self.assertTrue(dt_model.predict(features[0]) <= 0) @@ -315,7 +318,7 @@ class SciPyTests(PySparkTestCase): self.assertTrue(rr_model.predict(features[2]) <= 0) self.assertTrue(rr_model.predict(features[3]) > 0) - categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories + categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories dt_model = DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) self.assertTrue(dt_model.predict(features[0]) <= 0) self.assertTrue(dt_model.predict(features[1]) > 0) diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py index 1e0006df75ac62f8e66438e61d87d49c21b76c99..2518001ea0b93b85be339751c570332ddcf28c1b 100644 --- a/python/pyspark/mllib/tree.py +++ b/python/pyspark/mllib/tree.py @@ -25,7 +25,9 @@ from pyspark.mllib._common import \ from pyspark.mllib.regression import LabeledPoint from pyspark.serializers import NoOpSerializer + class DecisionTreeModel(object): + """ A decision tree model for classification or regression. @@ -77,6 +79,7 @@ class DecisionTreeModel(object): class DecisionTree(object): + """ Learning algorithm for a decision tree model for classification or regression. @@ -174,7 +177,6 @@ class DecisionTree(object): categoricalFeaturesInfo, impurity, maxDepth, maxBins) - @staticmethod def train(data, algo, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins=100): diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 639cda63502291c0d44de5968b66ae068ac100ab..4962d05491c039c80a8e3b0f8ee32ad19943f82f 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -26,6 +26,7 @@ from pyspark.serializers import NoOpSerializer class MLUtils: + """ Helper methods to load, save and pre-process data used in MLlib. """ diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 309f5a9b6038d4892f51504dcbff6d99c37e30bd..30b834d2085cd8c78bbce976598afbb86d361720 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -233,7 +233,7 @@ class RDD(object): def _toPickleSerialization(self): if (self._jrdd_deserializer == PickleSerializer() or - self._jrdd_deserializer == BatchedSerializer(PickleSerializer())): + self._jrdd_deserializer == BatchedSerializer(PickleSerializer())): return self else: return self._reserialize(BatchedSerializer(PickleSerializer(), 10)) @@ -1079,7 +1079,9 @@ class RDD(object): pickledRDD = self._toPickleSerialization() batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path, - outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf) + outputFormatClass, + keyClass, valueClass, + keyConverter, valueConverter, jconf) def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None): """ @@ -1125,8 +1127,10 @@ class RDD(object): pickledRDD = self._toPickleSerialization() batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path, - outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, - jconf, compressionCodecClass) + outputFormatClass, + keyClass, valueClass, + keyConverter, valueConverter, + jconf, compressionCodecClass) def saveAsSequenceFile(self, path, compressionCodecClass=None): """ @@ -1348,7 +1352,7 @@ class RDD(object): outputSerializer = self.ctx._unbatched_serializer limit = (_parse_memory(self.ctx._conf.get( - "spark.python.worker.memory", "512m")) / 2) + "spark.python.worker.memory", "512m")) / 2) def add_shuffle_key(split, iterator): @@ -1430,12 +1434,12 @@ class RDD(object): spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == 'true') memory = _parse_memory(self.ctx._conf.get( - "spark.python.worker.memory", "512m")) + "spark.python.worker.memory", "512m")) agg = Aggregator(createCombiner, mergeValue, mergeCombiners) def combineLocally(iterator): merger = ExternalMerger(agg, memory * 0.9, serializer) \ - if spill else InMemoryMerger(agg) + if spill else InMemoryMerger(agg) merger.mergeValues(iterator) return merger.iteritems() @@ -1444,7 +1448,7 @@ class RDD(object): def _mergeCombiners(iterator): merger = ExternalMerger(agg, memory, serializer) \ - if spill else InMemoryMerger(agg) + if spill else InMemoryMerger(agg) merger.mergeCombiners(iterator) return merger.iteritems() @@ -1588,7 +1592,7 @@ class RDD(object): """ for fraction in fractions.values(): assert fraction >= 0.0, "Negative fraction value: %s" % fraction - return self.mapPartitionsWithIndex( \ + return self.mapPartitionsWithIndex( RDDStratifiedSampler(withReplacement, fractions, seed).func, True) def subtractByKey(self, other, numPartitions=None): diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 2df000fdb08cac2491f3fa1a722be15ea2db4a8a..55e247da0e4dc64a654eaace265652620f50a1da 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -20,6 +20,7 @@ import random class RDDSamplerBase(object): + def __init__(self, withReplacement, seed=None): try: import numpy @@ -95,6 +96,7 @@ class RDDSamplerBase(object): class RDDSampler(RDDSamplerBase): + def __init__(self, withReplacement, fraction, seed=None): RDDSamplerBase.__init__(self, withReplacement, seed) self._fraction = fraction @@ -113,7 +115,9 @@ class RDDSampler(RDDSamplerBase): if self.getUniformSample(split) <= self._fraction: yield obj + class RDDStratifiedSampler(RDDSamplerBase): + def __init__(self, withReplacement, fractions, seed=None): RDDSamplerBase.__init__(self, withReplacement, seed) self._fractions = fractions diff --git a/python/pyspark/resultiterable.py b/python/pyspark/resultiterable.py index df34740fc8176f1cb8d15523d9c89456ebfca042..ef04c82866e6c5d91bc2ebcbf3b681ba0e8e97ff 100644 --- a/python/pyspark/resultiterable.py +++ b/python/pyspark/resultiterable.py @@ -21,9 +21,11 @@ import collections class ResultIterable(collections.Iterable): + """ A special result iterable. This is used because the standard iterator can not be pickled """ + def __init__(self, data): self.data = data self.index = 0 diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index a10f85b55ad301e8874c82c3249f4ab1ce3cf94f..b35558db3e0071a078fb8bfcc504bfaccb536032 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -111,6 +111,7 @@ class Serializer(object): class FramedSerializer(Serializer): + """ Serializer that writes objects as a stream of (length, data) pairs, where C{length} is a 32-bit integer and data is C{length} bytes. @@ -162,6 +163,7 @@ class FramedSerializer(Serializer): class BatchedSerializer(Serializer): + """ Serializes a stream of objects in batches by calling its wrapped Serializer with streams of objects. @@ -207,6 +209,7 @@ class BatchedSerializer(Serializer): class CartesianDeserializer(FramedSerializer): + """ Deserializes the JavaRDD cartesian() of two PythonRDDs. """ @@ -240,6 +243,7 @@ class CartesianDeserializer(FramedSerializer): class PairDeserializer(CartesianDeserializer): + """ Deserializes the JavaRDD zip() of two PythonRDDs. """ @@ -289,6 +293,7 @@ def _hack_namedtuple(cls): """ Make class generated by namedtuple picklable """ name = cls.__name__ fields = cls._fields + def __reduce__(self): return (_restore, (name, fields, tuple(self))) cls.__reduce__ = __reduce__ @@ -301,10 +306,11 @@ def _hijack_namedtuple(): if hasattr(collections.namedtuple, "__hijack"): return - global _old_namedtuple # or it will put in closure + global _old_namedtuple # or it will put in closure + def _copy_func(f): return types.FunctionType(f.func_code, f.func_globals, f.func_name, - f.func_defaults, f.func_closure) + f.func_defaults, f.func_closure) _old_namedtuple = _copy_func(collections.namedtuple) @@ -323,15 +329,16 @@ def _hijack_namedtuple(): # so only hack those in __main__ module for n, o in sys.modules["__main__"].__dict__.iteritems(): if (type(o) is type and o.__base__ is tuple - and hasattr(o, "_fields") - and "__reduce__" not in o.__dict__): - _hack_namedtuple(o) # hack inplace + and hasattr(o, "_fields") + and "__reduce__" not in o.__dict__): + _hack_namedtuple(o) # hack inplace _hijack_namedtuple() class PickleSerializer(FramedSerializer): + """ Serializes objects using Python's cPickle serializer: @@ -354,6 +361,7 @@ class CloudPickleSerializer(PickleSerializer): class MarshalSerializer(FramedSerializer): + """ Serializes objects using Python's Marshal serializer: @@ -367,9 +375,11 @@ class MarshalSerializer(FramedSerializer): class AutoSerializer(FramedSerializer): + """ Choose marshal or cPickle as serialization protocol autumatically """ + def __init__(self): FramedSerializer.__init__(self) self._type = None @@ -394,6 +404,7 @@ class AutoSerializer(FramedSerializer): class UTF8Deserializer(Serializer): + """ Deserializes streams written by String.getBytes. """ diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index e3923d1c36c57c3cfb6ac439dd3e22bc8b830494..2c68cd4921deb893637101c0dc5839a18ef73dcb 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -45,7 +45,7 @@ except ImportError: return int(line.split()[1]) >> 10 else: warnings.warn("Please install psutil to have better " - "support with spilling") + "support with spilling") if platform.system() == "Darwin": import resource rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss @@ -141,7 +141,7 @@ class ExternalMerger(Merger): This class works as follows: - - It repeatedly combine the items and save them in one dict in + - It repeatedly combine the items and save them in one dict in memory. - When the used memory goes above memory limit, it will split @@ -190,12 +190,12 @@ class ExternalMerger(Merger): MAX_TOTAL_PARTITIONS = 4096 def __init__(self, aggregator, memory_limit=512, serializer=None, - localdirs=None, scale=1, partitions=59, batch=1000): + localdirs=None, scale=1, partitions=59, batch=1000): Merger.__init__(self, aggregator) self.memory_limit = memory_limit # default serializer is only used for tests self.serializer = serializer or \ - BatchedSerializer(PickleSerializer(), 1024) + BatchedSerializer(PickleSerializer(), 1024) self.localdirs = localdirs or self._get_dirs() # number of partitions when spill data into disks self.partitions = partitions @@ -341,7 +341,7 @@ class ExternalMerger(Merger): self.pdata[i].clear() self.spills += 1 - gc.collect() # release the memory as much as possible + gc.collect() # release the memory as much as possible def iteritems(self): """ Return all merged items as iterator """ @@ -370,8 +370,8 @@ class ExternalMerger(Merger): if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS and j < self.spills - 1 and get_used_memory() > hard_limit): - self.data.clear() # will read from disk again - gc.collect() # release the memory as much as possible + self.data.clear() # will read from disk again + gc.collect() # release the memory as much as possible for v in self._recursive_merged_items(i): yield v return @@ -409,9 +409,9 @@ class ExternalMerger(Merger): for i in range(start, self.partitions): subdirs = [os.path.join(d, "parts", str(i)) - for d in self.localdirs] + for d in self.localdirs] m = ExternalMerger(self.agg, self.memory_limit, self.serializer, - subdirs, self.scale * self.partitions) + subdirs, self.scale * self.partitions) m.pdata = [{} for _ in range(self.partitions)] limit = self._next_limit() @@ -419,7 +419,7 @@ class ExternalMerger(Merger): path = self._get_spill_dir(j) p = os.path.join(path, str(i)) m._partitioned_mergeCombiners( - self.serializer.load_stream(open(p))) + self.serializer.load_stream(open(p))) if get_used_memory() > limit: m._spill() diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index adc56e7ec0e2b4d75e4add1c76a2eb93214771a5..950e275adbf01d4c2c044979428251b57eff0e4c 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -45,6 +45,7 @@ __all__ = [ class DataType(object): + """Spark SQL DataType""" def __repr__(self): @@ -62,6 +63,7 @@ class DataType(object): class PrimitiveTypeSingleton(type): + """Metaclass for PrimitiveType""" _instances = {} @@ -73,6 +75,7 @@ class PrimitiveTypeSingleton(type): class PrimitiveType(DataType): + """Spark SQL PrimitiveType""" __metaclass__ = PrimitiveTypeSingleton @@ -83,6 +86,7 @@ class PrimitiveType(DataType): class StringType(PrimitiveType): + """Spark SQL StringType The data type representing string values. @@ -90,6 +94,7 @@ class StringType(PrimitiveType): class BinaryType(PrimitiveType): + """Spark SQL BinaryType The data type representing bytearray values. @@ -97,6 +102,7 @@ class BinaryType(PrimitiveType): class BooleanType(PrimitiveType): + """Spark SQL BooleanType The data type representing bool values. @@ -104,6 +110,7 @@ class BooleanType(PrimitiveType): class TimestampType(PrimitiveType): + """Spark SQL TimestampType The data type representing datetime.datetime values. @@ -111,6 +118,7 @@ class TimestampType(PrimitiveType): class DecimalType(PrimitiveType): + """Spark SQL DecimalType The data type representing decimal.Decimal values. @@ -118,6 +126,7 @@ class DecimalType(PrimitiveType): class DoubleType(PrimitiveType): + """Spark SQL DoubleType The data type representing float values. @@ -125,6 +134,7 @@ class DoubleType(PrimitiveType): class FloatType(PrimitiveType): + """Spark SQL FloatType The data type representing single precision floating-point values. @@ -132,6 +142,7 @@ class FloatType(PrimitiveType): class ByteType(PrimitiveType): + """Spark SQL ByteType The data type representing int values with 1 singed byte. @@ -139,6 +150,7 @@ class ByteType(PrimitiveType): class IntegerType(PrimitiveType): + """Spark SQL IntegerType The data type representing int values. @@ -146,6 +158,7 @@ class IntegerType(PrimitiveType): class LongType(PrimitiveType): + """Spark SQL LongType The data type representing long values. If the any value is @@ -155,6 +168,7 @@ class LongType(PrimitiveType): class ShortType(PrimitiveType): + """Spark SQL ShortType The data type representing int values with 2 signed bytes. @@ -162,6 +176,7 @@ class ShortType(PrimitiveType): class ArrayType(DataType): + """Spark SQL ArrayType The data type representing list values. An ArrayType object @@ -187,10 +202,11 @@ class ArrayType(DataType): def __str__(self): return "ArrayType(%s,%s)" % (self.elementType, - str(self.containsNull).lower()) + str(self.containsNull).lower()) class MapType(DataType): + """Spark SQL MapType The data type representing dict values. A MapType object comprises @@ -226,10 +242,11 @@ class MapType(DataType): def __repr__(self): return "MapType(%s,%s,%s)" % (self.keyType, self.valueType, - str(self.valueContainsNull).lower()) + str(self.valueContainsNull).lower()) class StructField(DataType): + """Spark SQL StructField Represents a field in a StructType. @@ -263,10 +280,11 @@ class StructField(DataType): def __repr__(self): return "StructField(%s,%s,%s)" % (self.name, self.dataType, - str(self.nullable).lower()) + str(self.nullable).lower()) class StructType(DataType): + """Spark SQL StructType The data type representing rows. @@ -291,7 +309,7 @@ class StructType(DataType): def __repr__(self): return ("StructType(List(%s))" % - ",".join(str(field) for field in self.fields)) + ",".join(str(field) for field in self.fields)) def _parse_datatype_list(datatype_list_string): @@ -319,7 +337,7 @@ def _parse_datatype_list(datatype_list_string): _all_primitive_types = dict((k, v) for k, v in globals().iteritems() - if type(v) is PrimitiveTypeSingleton and v.__base__ == PrimitiveType) + if type(v) is PrimitiveTypeSingleton and v.__base__ == PrimitiveType) def _parse_datatype_string(datatype_string): @@ -459,16 +477,16 @@ def _infer_schema(row): items = sorted(row.items()) elif isinstance(row, tuple): - if hasattr(row, "_fields"): # namedtuple + if hasattr(row, "_fields"): # namedtuple items = zip(row._fields, tuple(row)) - elif hasattr(row, "__FIELDS__"): # Row + elif hasattr(row, "__FIELDS__"): # Row items = zip(row.__FIELDS__, tuple(row)) elif all(isinstance(x, tuple) and len(x) == 2 for x in row): items = row else: raise ValueError("Can't infer schema from tuple") - elif hasattr(row, "__dict__"): # object + elif hasattr(row, "__dict__"): # object items = sorted(row.__dict__.items()) else: @@ -499,7 +517,7 @@ def _create_converter(obj, dataType): conv = lambda o: tuple(o.get(n) for n in names) elif isinstance(obj, tuple): - if hasattr(obj, "_fields"): # namedtuple + if hasattr(obj, "_fields"): # namedtuple conv = tuple elif hasattr(obj, "__FIELDS__"): conv = tuple @@ -508,7 +526,7 @@ def _create_converter(obj, dataType): else: raise ValueError("unexpected tuple") - elif hasattr(obj, "__dict__"): # object + elif hasattr(obj, "__dict__"): # object conv = lambda o: [o.__dict__.get(n, None) for n in names] nested = any(_has_struct(f.dataType) for f in dataType.fields) @@ -660,7 +678,7 @@ def _infer_schema_type(obj, dataType): assert len(fs) == len(obj), \ "Obj(%s) have different length with fields(%s)" % (obj, fs) fields = [StructField(f.name, _infer_schema_type(o, f.dataType), True) - for o, f in zip(obj, fs)] + for o, f in zip(obj, fs)] return StructType(fields) else: @@ -683,6 +701,7 @@ _acceptable_types = { StructType: (tuple, list), } + def _verify_type(obj, dataType): """ Verify the type of obj against dataType, raise an exception if @@ -728,7 +747,7 @@ def _verify_type(obj, dataType): elif isinstance(dataType, StructType): if len(obj) != len(dataType.fields): raise ValueError("Length of object (%d) does not match with" - "length of fields (%d)" % (len(obj), len(dataType.fields))) + "length of fields (%d)" % (len(obj), len(dataType.fields))) for v, f in zip(obj, dataType.fields): _verify_type(v, f.dataType) @@ -861,6 +880,7 @@ def _create_cls(dataType): raise Exception("unexpected data type: %s" % dataType) class Row(tuple): + """ Row in SchemaRDD """ __DATATYPE__ = dataType __FIELDS__ = tuple(f.name for f in dataType.fields) @@ -872,7 +892,7 @@ def _create_cls(dataType): def __repr__(self): # call collect __repr__ for nested objects return ("Row(%s)" % ", ".join("%s=%r" % (n, getattr(self, n)) - for n in self.__FIELDS__)) + for n in self.__FIELDS__)) def __reduce__(self): return (_restore_object, (self.__DATATYPE__, tuple(self))) @@ -881,6 +901,7 @@ def _create_cls(dataType): class SQLContext: + """Main entry point for SparkSQL functionality. A SQLContext can be used create L{SchemaRDD}s, register L{SchemaRDD}s as @@ -960,7 +981,7 @@ class SQLContext: env = MapConverter().convert(self._sc.environment, self._sc._gateway._gateway_client) includes = ListConverter().convert(self._sc._python_includes, - self._sc._gateway._gateway_client) + self._sc._gateway._gateway_client) self._ssql_ctx.registerPython(name, bytearray(CloudPickleSerializer().dumps(command)), env, @@ -1012,7 +1033,7 @@ class SQLContext: first = rdd.first() if not first: raise ValueError("The first row in RDD is empty, " - "can not infer schema") + "can not infer schema") if type(first) is dict: warnings.warn("Using RDD of dict to inferSchema is deprecated") @@ -1287,6 +1308,7 @@ class SQLContext: class HiveContext(SQLContext): + """A variant of Spark SQL that integrates with data stored in Hive. Configuration for Hive is read from hive-site.xml on the classpath. @@ -1327,6 +1349,7 @@ class HiveContext(SQLContext): class LocalHiveContext(HiveContext): + """Starts up an instance of hive where metadata is stored locally. An in-process metadata data is created with data stored in ./metadata. @@ -1357,7 +1380,7 @@ class LocalHiveContext(HiveContext): def __init__(self, sparkContext, sqlContext=None): HiveContext.__init__(self, sparkContext, sqlContext) warnings.warn("LocalHiveContext is deprecated. " - "Use HiveContext instead.", DeprecationWarning) + "Use HiveContext instead.", DeprecationWarning) def _get_hive_ctx(self): return self._jvm.LocalHiveContext(self._jsc.sc()) @@ -1376,6 +1399,7 @@ def _create_row(fields, values): class Row(tuple): + """ A row in L{SchemaRDD}. The fields in it can be accessed like attributes. @@ -1417,7 +1441,6 @@ class Row(tuple): else: raise ValueError("No args or kwargs") - # let obect acs like class def __call__(self, *args): """create new Row object""" @@ -1443,12 +1466,13 @@ class Row(tuple): def __repr__(self): if hasattr(self, "__FIELDS__"): return "Row(%s)" % ", ".join("%s=%r" % (k, v) - for k, v in zip(self.__FIELDS__, self)) + for k, v in zip(self.__FIELDS__, self)) else: return "<Row(%s)>" % ", ".join(self) class SchemaRDD(RDD): + """An RDD of L{Row} objects that has an associated schema. The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can @@ -1659,7 +1683,7 @@ class SchemaRDD(RDD): rdd = self._jschema_rdd.subtract(other._jschema_rdd) else: rdd = self._jschema_rdd.subtract(other._jschema_rdd, - numPartitions) + numPartitions) return SchemaRDD(rdd, self.sql_ctx) else: raise ValueError("Can only subtract another SchemaRDD") @@ -1686,9 +1710,9 @@ def _test(): jsonStrings = [ '{"field1": 1, "field2": "row1", "field3":{"field4":11}}', '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},' - '"field6":[{"field7": "row2"}]}', + '"field6":[{"field7": "row2"}]}', '{"field1" : null, "field2": "row3", ' - '"field3":{"field4":33, "field5": []}}' + '"field3":{"field4":33, "field5": []}}' ] globs['jsonStrings'] = jsonStrings globs['json'] = sc.parallelize(jsonStrings) diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py index 5d77a131f28562d1372b08ad8947f3850ac150ec..2aa0fb9d2c1ed580204e9c0f344e45e1cb2b850c 100644 --- a/python/pyspark/storagelevel.py +++ b/python/pyspark/storagelevel.py @@ -19,6 +19,7 @@ __all__ = ["StorageLevel"] class StorageLevel: + """ Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 4ac94ba729d35fc3b6f767c6b5daf670695c9c3b..88a61176e51abefb276faf7b9efac25c9e08ddaa 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -62,53 +62,53 @@ class TestMerger(unittest.TestCase): self.N = 1 << 16 self.l = [i for i in xrange(self.N)] self.data = zip(self.l, self.l) - self.agg = Aggregator(lambda x: [x], - lambda x, y: x.append(y) or x, - lambda x, y: x.extend(y) or x) + self.agg = Aggregator(lambda x: [x], + lambda x, y: x.append(y) or x, + lambda x, y: x.extend(y) or x) def test_in_memory(self): m = InMemoryMerger(self.agg) m.mergeValues(self.data) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N))) + sum(xrange(self.N))) m = InMemoryMerger(self.agg) m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data)) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N))) + sum(xrange(self.N))) def test_small_dataset(self): m = ExternalMerger(self.agg, 1000) m.mergeValues(self.data) self.assertEqual(m.spills, 0) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N))) + sum(xrange(self.N))) m = ExternalMerger(self.agg, 1000) m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data)) self.assertEqual(m.spills, 0) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N))) + sum(xrange(self.N))) def test_medium_dataset(self): m = ExternalMerger(self.agg, 10) m.mergeValues(self.data) self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N))) + sum(xrange(self.N))) m = ExternalMerger(self.agg, 10) m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data * 3)) self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N)) * 3) + sum(xrange(self.N)) * 3) def test_huge_dataset(self): m = ExternalMerger(self.agg, 10) m.mergeCombiners(map(lambda (k, v): (k, [str(v)]), self.data * 10)) self.assertTrue(m.spills >= 1) self.assertEqual(sum(len(v) for k, v in m._recursive_merged_items(0)), - self.N * 10) + self.N * 10) m._cleanup() @@ -188,6 +188,7 @@ class TestAddFile(PySparkTestCase): log4j = self.sc._jvm.org.apache.log4j old_level = log4j.LogManager.getRootLogger().getLevel() log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL) + def func(x): from userlibrary import UserClass return UserClass().hello() @@ -355,8 +356,8 @@ class TestInputFormat(PySparkTestCase): self.assertEqual(doubles, ed) bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/", - "org.apache.hadoop.io.IntWritable", - "org.apache.hadoop.io.BytesWritable").collect()) + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BytesWritable").collect()) ebs = [(1, bytearray('aa', 'utf-8')), (1, bytearray('aa', 'utf-8')), (2, bytearray('aa', 'utf-8')), @@ -428,9 +429,9 @@ class TestInputFormat(PySparkTestCase): self.assertEqual(clazz[0], ec) unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/", - "org.apache.hadoop.io.Text", - "org.apache.spark.api.python.TestWritable", - batchSize=1).collect()) + "org.apache.hadoop.io.Text", + "org.apache.spark.api.python.TestWritable", + batchSize=1).collect()) self.assertEqual(unbatched_clazz[0], ec) def test_oldhadoop(self): @@ -443,7 +444,7 @@ class TestInputFormat(PySparkTestCase): self.assertEqual(ints, ei) hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") - oldconf = {"mapred.input.dir" : hellopath} + oldconf = {"mapred.input.dir": hellopath} hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", @@ -462,7 +463,7 @@ class TestInputFormat(PySparkTestCase): self.assertEqual(ints, ei) hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") - newconf = {"mapred.input.dir" : hellopath} + newconf = {"mapred.input.dir": hellopath} hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", @@ -517,6 +518,7 @@ class TestInputFormat(PySparkTestCase): (u'\x03', [2.0])] self.assertEqual(maps, em) + class TestOutputFormat(PySparkTestCase): def setUp(self): @@ -574,8 +576,8 @@ class TestOutputFormat(PySparkTestCase): def test_oldhadoop(self): basepath = self.tempdir.name dict_data = [(1, {}), - (1, {"row1" : 1.0}), - (2, {"row2" : 2.0})] + (1, {"row1": 1.0}), + (2, {"row2": 2.0})] self.sc.parallelize(dict_data).saveAsHadoopFile( basepath + "/oldhadoop/", "org.apache.hadoop.mapred.SequenceFileOutputFormat", @@ -589,12 +591,13 @@ class TestOutputFormat(PySparkTestCase): self.assertEqual(result, dict_data) conf = { - "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat", - "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable", - "mapred.output.dir" : basepath + "/olddataset/"} + "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat", + "mapred.output.key.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class": "org.apache.hadoop.io.MapWritable", + "mapred.output.dir": basepath + "/olddataset/" + } self.sc.parallelize(dict_data).saveAsHadoopDataset(conf) - input_conf = {"mapred.input.dir" : basepath + "/olddataset/"} + input_conf = {"mapred.input.dir": basepath + "/olddataset/"} old_dataset = sorted(self.sc.hadoopRDD( "org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", @@ -622,14 +625,17 @@ class TestOutputFormat(PySparkTestCase): valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect()) self.assertEqual(result, array_data) - conf = {"mapreduce.outputformat.class" : - "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", - "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable", - "mapred.output.dir" : basepath + "/newdataset/"} - self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf, + conf = { + "mapreduce.outputformat.class": + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", + "mapred.output.key.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class": "org.apache.spark.api.python.DoubleArrayWritable", + "mapred.output.dir": basepath + "/newdataset/" + } + self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset( + conf, valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter") - input_conf = {"mapred.input.dir" : basepath + "/newdataset/"} + input_conf = {"mapred.input.dir": basepath + "/newdataset/"} new_dataset = sorted(self.sc.newAPIHadoopRDD( "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", @@ -640,7 +646,7 @@ class TestOutputFormat(PySparkTestCase): def test_newolderror(self): basepath = self.tempdir.name - rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) + rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile( basepath + "/newolderror/saveAsHadoopFile/", "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")) @@ -650,7 +656,7 @@ class TestOutputFormat(PySparkTestCase): def test_bad_inputs(self): basepath = self.tempdir.name - rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) + rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile( basepath + "/badinputs/saveAsHadoopFile/", "org.apache.hadoop.mapred.NotValidOutputFormat")) @@ -685,30 +691,32 @@ class TestOutputFormat(PySparkTestCase): result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect()) self.assertEqual(result1, data) - rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop", - "org.apache.hadoop.mapred.SequenceFileOutputFormat") + rdd.saveAsHadoopFile( + basepath + "/reserialize/hadoop", + "org.apache.hadoop.mapred.SequenceFileOutputFormat") result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect()) self.assertEqual(result2, data) - rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop", - "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") + rdd.saveAsNewAPIHadoopFile( + basepath + "/reserialize/newhadoop", + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect()) self.assertEqual(result3, data) conf4 = { - "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat", - "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.dir" : basepath + "/reserialize/dataset"} + "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat", + "mapred.output.key.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.dir": basepath + "/reserialize/dataset"} rdd.saveAsHadoopDataset(conf4) result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect()) self.assertEqual(result4, data) - conf5 = {"mapreduce.outputformat.class" : - "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", - "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.dir" : basepath + "/reserialize/newdataset"} + conf5 = {"mapreduce.outputformat.class": + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", + "mapred.output.key.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.dir": basepath + "/reserialize/newdataset"} rdd.saveAsNewAPIHadoopDataset(conf5) result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect()) self.assertEqual(result5, data) @@ -719,25 +727,28 @@ class TestOutputFormat(PySparkTestCase): self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile( basepath + "/unbatched/") - unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/", + unbatched_sequence = sorted(self.sc.sequenceFile( + basepath + "/unbatched/", batchSize=1).collect()) self.assertEqual(unbatched_sequence, ei) - unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + "/unbatched/", + unbatched_hadoopFile = sorted(self.sc.hadoopFile( + basepath + "/unbatched/", "org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.Text", batchSize=1).collect()) self.assertEqual(unbatched_hadoopFile, ei) - unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath + "/unbatched/", + unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile( + basepath + "/unbatched/", "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.Text", batchSize=1).collect()) self.assertEqual(unbatched_newAPIHadoopFile, ei) - oldconf = {"mapred.input.dir" : basepath + "/unbatched/"} + oldconf = {"mapred.input.dir": basepath + "/unbatched/"} unbatched_hadoopRDD = sorted(self.sc.hadoopRDD( "org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", @@ -746,7 +757,7 @@ class TestOutputFormat(PySparkTestCase): batchSize=1).collect()) self.assertEqual(unbatched_hadoopRDD, ei) - newconf = {"mapred.input.dir" : basepath + "/unbatched/"} + newconf = {"mapred.input.dir": basepath + "/unbatched/"} unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD( "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", @@ -763,7 +774,9 @@ class TestOutputFormat(PySparkTestCase): self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile( basepath + "/malformed/sequence")) + class TestDaemon(unittest.TestCase): + def connect(self, port): from socket import socket, AF_INET, SOCK_STREAM sock = socket(AF_INET, SOCK_STREAM) @@ -810,12 +823,15 @@ class TestDaemon(unittest.TestCase): class TestWorker(PySparkTestCase): + def test_cancel_task(self): temp = tempfile.NamedTemporaryFile(delete=True) temp.close() path = temp.name + def sleep(x): - import os, time + import os + import time with open(path, 'w') as f: f.write("%d %d" % (os.getppid(), os.getpid())) time.sleep(100) @@ -845,7 +861,7 @@ class TestWorker(PySparkTestCase): os.kill(worker_pid, 0) time.sleep(0.1) except OSError: - break # worker was killed + break # worker was killed else: self.fail("worker has not been killed after 5 seconds") @@ -855,12 +871,13 @@ class TestWorker(PySparkTestCase): self.fail("daemon had been killed") def test_fd_leak(self): - N = 1100 # fd limit is 1024 by default + N = 1100 # fd limit is 1024 by default rdd = self.sc.parallelize(range(N), N) self.assertEquals(N, rdd.count()) class TestSparkSubmit(unittest.TestCase): + def setUp(self): self.programDir = tempfile.mkdtemp() self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit") @@ -953,9 +970,9 @@ class TestSparkSubmit(unittest.TestCase): |def myfunc(x): | return x + 1 """) - proc = subprocess.Popen( - [self.sparkSubmit, "--py-files", zip, "--master", "local-cluster[1,1,512]", script], - stdout=subprocess.PIPE) + proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master", + "local-cluster[1,1,512]", script], + stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[2, 3, 4]", out) @@ -981,6 +998,7 @@ class TestSparkSubmit(unittest.TestCase): @unittest.skipIf(not _have_scipy, "SciPy not installed") class SciPyTests(PySparkTestCase): + """General PySpark tests that depend on scipy """ def test_serialize(self): @@ -993,15 +1011,16 @@ class SciPyTests(PySparkTestCase): @unittest.skipIf(not _have_numpy, "NumPy not installed") class NumPyTests(PySparkTestCase): + """General PySpark tests that depend on numpy """ def test_statcounter_array(self): - x = self.sc.parallelize([np.array([1.0,1.0]), np.array([2.0,2.0]), np.array([3.0,3.0])]) + x = self.sc.parallelize([np.array([1.0, 1.0]), np.array([2.0, 2.0]), np.array([3.0, 3.0])]) s = x.stats() - self.assertSequenceEqual([2.0,2.0], s.mean().tolist()) - self.assertSequenceEqual([1.0,1.0], s.min().tolist()) - self.assertSequenceEqual([3.0,3.0], s.max().tolist()) - self.assertSequenceEqual([1.0,1.0], s.sampleStdev().tolist()) + self.assertSequenceEqual([2.0, 2.0], s.mean().tolist()) + self.assertSequenceEqual([1.0, 1.0], s.min().tolist()) + self.assertSequenceEqual([3.0, 3.0], s.max().tolist()) + self.assertSequenceEqual([1.0, 1.0], s.sampleStdev().tolist()) if __name__ == "__main__": diff --git a/python/test_support/userlibrary.py b/python/test_support/userlibrary.py index 8e4a6292bc17c53391fd50acc5d838b9173af8f2..73fd26e71f10d1e0a7d543d0bbe0d0229e0681a5 100755 --- a/python/test_support/userlibrary.py +++ b/python/test_support/userlibrary.py @@ -19,6 +19,8 @@ Used to test shipping of code depenencies with SparkContext.addPyFile(). """ + class UserClass(object): + def hello(self): return "Hello World!" diff --git a/tox.ini b/tox.ini index 44766e529bf7f08ad8427c7d278869d4f184c465..a1fefdd0e176f57159dceedf4bc79c0dfabc0920 100644 --- a/tox.ini +++ b/tox.ini @@ -15,3 +15,4 @@ [pep8] max-line-length=100 +exclude=cloudpickle.py