Skip to content
Snippets Groups Projects
  • Nicholas Chammas's avatar
    d614967b
    [SPARK-2627] [PySpark] have the build enforce PEP 8 automatically · d614967b
    Nicholas Chammas authored
    As described in [SPARK-2627](https://issues.apache.org/jira/browse/SPARK-2627), we'd like Python code to automatically be checked for PEP 8 compliance by Jenkins. This pull request aims to do that.
    
    Notes:
    * We may need to install [`pep8`](https://pypi.python.org/pypi/pep8) on the build server.
    * I'm expecting tests to fail now that PEP 8 compliance is being checked as part of the build. I'm fine with cleaning up any remaining PEP 8 violations as part of this pull request.
    * I did not understand why the RAT and scalastyle reports are saved to text files. I did the same for the PEP 8 check, but only so that the console output style can match those for the RAT and scalastyle checks. The PEP 8 report is removed right after the check is complete.
    * Updates to the ["Contributing to Spark"](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) guide will be submitted elsewhere, as I don't believe that text is part of the Spark repo.
    
    Author: Nicholas Chammas <nicholas.chammas@gmail.com>
    Author: nchammas <nicholas.chammas@gmail.com>
    
    Closes #1744 from nchammas/master and squashes the following commits:
    
    274b238 [Nicholas Chammas] [SPARK-2627] [PySpark] minor indentation changes
    983d963 [nchammas] Merge pull request #5 from apache/master
    1db5314 [nchammas] Merge pull request #4 from apache/master
    0e0245f [Nicholas Chammas] [SPARK-2627] undo erroneous whitespace fixes
    bf30942 [Nicholas Chammas] [SPARK-2627] PEP8: comment spacing
    6db9a44 [nchammas] Merge pull request #3 from apache/master
    7b4750e [Nicholas Chammas] merge upstream changes
    91b7584 [Nicholas Chammas] [SPARK-2627] undo unnecessary line breaks
    44e3e56 [Nicholas Chammas] [SPARK-2627] use tox.ini to exclude files
    b09fae2 [Nicholas Chammas] don't wrap comments unnecessarily
    bfb9f9f [Nicholas Chammas] [SPARK-2627] keep up with the PEP 8 fixes
    9da347f [nchammas] Merge pull request #2 from apache/master
    aa5b4b5 [Nicholas Chammas] [SPARK-2627] follow Spark bash style for if blocks
    d0a83b9 [Nicholas Chammas] [SPARK-2627] check that pep8 downloaded fine
    dffb5dd [Nicholas Chammas] [SPARK-2627] download pep8 at runtime
    a1ce7ae [Nicholas Chammas] [SPARK-2627] space out test report sections
    21da538 [Nicholas Chammas] [SPARK-2627] it's PEP 8, not PEP8
    6f4900b [Nicholas Chammas] [SPARK-2627] more misc PEP 8 fixes
    fe57ed0 [Nicholas Chammas] removing merge conflict backups
    9c01d4c [nchammas] Merge pull request #1 from apache/master
    9a66cb0 [Nicholas Chammas] resolving merge conflicts
    a31ccc4 [Nicholas Chammas] [SPARK-2627] miscellaneous PEP 8 fixes
    beaa9ac [Nicholas Chammas] [SPARK-2627] fail check on non-zero status
    723ed39 [Nicholas Chammas] always delete the report file
    0541ebb [Nicholas Chammas] [SPARK-2627] call Python linter from run-tests
    12440fa [Nicholas Chammas] [SPARK-2627] add Scala linter
    61c07b9 [Nicholas Chammas] [SPARK-2627] add Python linter
    75ad552 [Nicholas Chammas] make check output style consistent
    d614967b
    History
    [SPARK-2627] [PySpark] have the build enforce PEP 8 automatically
    Nicholas Chammas authored
    As described in [SPARK-2627](https://issues.apache.org/jira/browse/SPARK-2627), we'd like Python code to automatically be checked for PEP 8 compliance by Jenkins. This pull request aims to do that.
    
    Notes:
    * We may need to install [`pep8`](https://pypi.python.org/pypi/pep8) on the build server.
    * I'm expecting tests to fail now that PEP 8 compliance is being checked as part of the build. I'm fine with cleaning up any remaining PEP 8 violations as part of this pull request.
    * I did not understand why the RAT and scalastyle reports are saved to text files. I did the same for the PEP 8 check, but only so that the console output style can match those for the RAT and scalastyle checks. The PEP 8 report is removed right after the check is complete.
    * Updates to the ["Contributing to Spark"](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) guide will be submitted elsewhere, as I don't believe that text is part of the Spark repo.
    
    Author: Nicholas Chammas <nicholas.chammas@gmail.com>
    Author: nchammas <nicholas.chammas@gmail.com>
    
    Closes #1744 from nchammas/master and squashes the following commits:
    
    274b238 [Nicholas Chammas] [SPARK-2627] [PySpark] minor indentation changes
    983d963 [nchammas] Merge pull request #5 from apache/master
    1db5314 [nchammas] Merge pull request #4 from apache/master
    0e0245f [Nicholas Chammas] [SPARK-2627] undo erroneous whitespace fixes
    bf30942 [Nicholas Chammas] [SPARK-2627] PEP8: comment spacing
    6db9a44 [nchammas] Merge pull request #3 from apache/master
    7b4750e [Nicholas Chammas] merge upstream changes
    91b7584 [Nicholas Chammas] [SPARK-2627] undo unnecessary line breaks
    44e3e56 [Nicholas Chammas] [SPARK-2627] use tox.ini to exclude files
    b09fae2 [Nicholas Chammas] don't wrap comments unnecessarily
    bfb9f9f [Nicholas Chammas] [SPARK-2627] keep up with the PEP 8 fixes
    9da347f [nchammas] Merge pull request #2 from apache/master
    aa5b4b5 [Nicholas Chammas] [SPARK-2627] follow Spark bash style for if blocks
    d0a83b9 [Nicholas Chammas] [SPARK-2627] check that pep8 downloaded fine
    dffb5dd [Nicholas Chammas] [SPARK-2627] download pep8 at runtime
    a1ce7ae [Nicholas Chammas] [SPARK-2627] space out test report sections
    21da538 [Nicholas Chammas] [SPARK-2627] it's PEP 8, not PEP8
    6f4900b [Nicholas Chammas] [SPARK-2627] more misc PEP 8 fixes
    fe57ed0 [Nicholas Chammas] removing merge conflict backups
    9c01d4c [nchammas] Merge pull request #1 from apache/master
    9a66cb0 [Nicholas Chammas] resolving merge conflicts
    a31ccc4 [Nicholas Chammas] [SPARK-2627] miscellaneous PEP 8 fixes
    beaa9ac [Nicholas Chammas] [SPARK-2627] fail check on non-zero status
    723ed39 [Nicholas Chammas] always delete the report file
    0541ebb [Nicholas Chammas] [SPARK-2627] call Python linter from run-tests
    12440fa [Nicholas Chammas] [SPARK-2627] add Scala linter
    61c07b9 [Nicholas Chammas] [SPARK-2627] add Python linter
    75ad552 [Nicholas Chammas] make check output style consistent
classification.py 10.02 KiB
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy

from numpy import array, shape
from pyspark import SparkContext
from pyspark.mllib._common import \
    _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
    _serialize_double_matrix, _deserialize_double_matrix, \
    _serialize_double_vector, _deserialize_double_vector, \
    _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
    _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint, LinearModel
from math import exp, log


class LogisticRegressionModel(LinearModel):

    """A linear binary classification model derived from logistic regression.

    >>> data = [
    ...     LabeledPoint(0.0, [0.0]),
    ...     LabeledPoint(1.0, [1.0]),
    ...     LabeledPoint(1.0, [2.0]),
    ...     LabeledPoint(1.0, [3.0])
    ... ]
    >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
    >>> lrm.predict(array([1.0])) > 0
    True
    >>> lrm.predict(array([0.0])) <= 0
    True
    >>> sparse_data = [
    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
    ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
    ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
    ... ]
    >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
    >>> lrm.predict(array([0.0, 1.0])) > 0
    True
    >>> lrm.predict(array([0.0, 0.0])) <= 0
    True
    >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
    True
    >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
    True
    """

    def predict(self, x):
        _linear_predictor_typecheck(x, self._coeff)
        margin = _dot(x, self._coeff) + self._intercept
        if margin > 0:
            prob = 1 / (1 + exp(-margin))
        else:
            exp_margin = exp(margin)
            prob = exp_margin / (1 + exp_margin)
        return 1 if prob > 0.5 else 0


class LogisticRegressionWithSGD(object):

    @classmethod
    def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
              initialWeights=None, regParam=1.0, regType=None, intercept=False):
        """
        Train a logistic regression model on the given data.

        @param data:              The training data.
        @param iterations:        The number of iterations (default: 100).
        @param step:              The step parameter used in SGD
                                  (default: 1.0).
        @param miniBatchFraction: Fraction of data to be used for each SGD
                                  iteration.
        @param initialWeights:    The initial weights (default: None).
        @param regParam:          The regularizer parameter (default: 1.0).
        @param regType:           The type of regularizer used for training
                                  our model.
                                  Allowed values: "l1" for using L1Updater,
                                                  "l2" for using
                                                       SquaredL2Updater,
                                                  "none" for no regularizer.
                                  (default: "none")
        @param intercept:         Boolean parameter which indicates the use
                                  or not of the augmented representation for
                                  training data (i.e. whether bias features
                                  are activated or not).
        """
        sc = data.context
        if regType is None:
            regType = "none"
        train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(
            d._jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
        return _regression_train_wrapper(sc, train_func, LogisticRegressionModel, data,
                                         initialWeights)


class SVMModel(LinearModel):

    """A support vector machine.

    >>> data = [
    ...     LabeledPoint(0.0, [0.0]),
    ...     LabeledPoint(1.0, [1.0]),
    ...     LabeledPoint(1.0, [2.0]),
    ...     LabeledPoint(1.0, [3.0])
    ... ]
    >>> svm = SVMWithSGD.train(sc.parallelize(data))
    >>> svm.predict(array([1.0])) > 0
    True
    >>> sparse_data = [
    ...     LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
    ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
    ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
    ... ]
    >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
    >>> svm.predict(SparseVector(2, {1: 1.0})) > 0
    True
    >>> svm.predict(SparseVector(2, {0: -1.0})) <= 0
    True
    """

    def predict(self, x):
        _linear_predictor_typecheck(x, self._coeff)
        margin = _dot(x, self._coeff) + self._intercept
        return 1 if margin >= 0 else 0


class SVMWithSGD(object):

    @classmethod
    def train(cls, data, iterations=100, step=1.0, regParam=1.0,
              miniBatchFraction=1.0, initialWeights=None, regType=None, intercept=False):
        """
        Train a support vector machine on the given data.

        @param data:              The training data.
        @param iterations:        The number of iterations (default: 100).
        @param step:              The step parameter used in SGD
                                  (default: 1.0).
        @param regParam:          The regularizer parameter (default: 1.0).
        @param miniBatchFraction: Fraction of data to be used for each SGD
                                  iteration.
        @param initialWeights:    The initial weights (default: None).
        @param regType:           The type of regularizer used for training
                                  our model.
                                  Allowed values: "l1" for using L1Updater,
                                                  "l2" for using
                                                       SquaredL2Updater,
                                                  "none" for no regularizer.
                                  (default: "none")
        @param intercept:         Boolean parameter which indicates the use
                                  or not of the augmented representation for
                                  training data (i.e. whether bias features
                                  are activated or not).
        """
        sc = data.context
        if regType is None:
            regType = "none"
        train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(
            d._jrdd, iterations, step, regParam, miniBatchFraction, i, regType, intercept)
        return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights)


class NaiveBayesModel(object):

    """
    Model for Naive Bayes classifiers.

    Contains two parameters:
    - pi: vector of logs of class priors (dimension C)
    - theta: matrix of logs of class conditional probabilities (CxD)

    >>> data = [
    ...     LabeledPoint(0.0, [0.0, 0.0]),
    ...     LabeledPoint(0.0, [0.0, 1.0]),
    ...     LabeledPoint(1.0, [1.0, 0.0]),
    ... ]
    >>> model = NaiveBayes.train(sc.parallelize(data))
    >>> model.predict(array([0.0, 1.0]))
    0.0
    >>> model.predict(array([1.0, 0.0]))
    1.0
    >>> sparse_data = [
    ...     LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
    ...     LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
    ...     LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
    ... ]
    >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
    >>> model.predict(SparseVector(2, {1: 1.0}))
    0.0
    >>> model.predict(SparseVector(2, {0: 1.0}))
    1.0
    """

    def __init__(self, labels, pi, theta):
        self.labels = labels
        self.pi = pi
        self.theta = theta

    def predict(self, x):
        """Return the most likely class for a data vector x"""
        return self.labels[numpy.argmax(self.pi + _dot(x, self.theta.transpose()))]


class NaiveBayes(object):

    @classmethod
    def train(cls, data, lambda_=1.0):
        """
        Train a Naive Bayes model given an RDD of (label, features) vectors.

        This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can
        handle all kinds of discrete data.  For example, by converting
        documents into TF-IDF vectors, it can be used for document
        classification.  By making every vector a 0-1 vector, it can also be
        used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).

        @param data: RDD of NumPy vectors, one per element, where the first
               coordinate is the label and the rest is the feature vector
               (e.g. a count vector).
        @param lambda_: The smoothing parameter
        """
        sc = data.context
        dataBytes = _get_unmangled_labeled_point_rdd(data)
        ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
        return NaiveBayesModel(
            _deserialize_double_vector(ans[0]),
            _deserialize_double_vector(ans[1]),
            _deserialize_double_matrix(ans[2]))


def _test():
    import doctest
    globs = globals().copy()
    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
    (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
    globs['sc'].stop()
    if failure_count:
        exit(-1)

if __name__ == "__main__":
    _test()