Skip to content
Snippets Groups Projects
  • Joseph K. Bradley's avatar
    3f67382e
    [SPARK-2478] [mllib] DecisionTree Python API · 3f67382e
    Joseph K. Bradley authored
    Added experimental Python API for Decision Trees.
    
    API:
    * class DecisionTreeModel
    ** predict() for single examples and RDDs, taking both feature vectors and LabeledPoints
    ** numNodes()
    ** depth()
    ** __str__()
    * class DecisionTree
    ** trainClassifier()
    ** trainRegressor()
    ** train()
    
    Examples and testing:
    * Added example testing classification and regression with batch prediction: examples/src/main/python/mllib/tree.py
    * Have also tested example usage in doc of python/pyspark/mllib/tree.py which tests single-example prediction with dense and sparse vectors
    
    Also: Small bug fix in python/pyspark/mllib/_common.py: In _linear_predictor_typecheck, changed check for RDD to use isinstance() instead of type() in order to catch RDD subclasses.
    
    CC mengxr manishamde
    
    Author: Joseph K. Bradley <joseph.kurata.bradley@gmail.com>
    
    Closes #1727 from jkbradley/decisiontree-python-new and squashes the following commits:
    
    3744488 [Joseph K. Bradley] Renamed test tree.py to decision_tree_runner.py Small updates based on github review.
    6b86a9d [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    affceb9 [Joseph K. Bradley] * Fixed bug in doc tests in pyspark/mllib/util.py caused by change in loadLibSVMFile behavior.  (It used to threshold labels at 0 to make them 0/1, but it now leaves them as they are.) * Fixed small bug in loadLibSVMFile: If a data file had no features, then loadLibSVMFile would create a single all-zero feature.
    67a29bc [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    cf46ad7 [Joseph K. Bradley] Python DecisionTreeModel * predict(empty RDD) returns an empty RDD instead of an error. * Removed support for calling predict() on LabeledPoint and RDD[LabeledPoint] * predict() does not cache serialized RDD any more.
    aa29873 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    bf21be4 [Joseph K. Bradley] removed old run() func from DecisionTree
    fa10ea7 [Joseph K. Bradley] Small style update
    7968692 [Joseph K. Bradley] small braces typo fix
    e34c263 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    4801b40 [Joseph K. Bradley] Small style update to DecisionTreeSuite
    db0eab2 [Joseph K. Bradley] Merge branch 'decisiontree-bugfix2' into decisiontree-python-new
    6873fa9 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    225822f [Joseph K. Bradley] Bug: In DecisionTree, the method sequentialBinSearchForOrderedCategoricalFeatureInClassification() indexed bins from 0 to (math.pow(2, featureCategories.toInt - 1) - 1). This upper bound is the bound for unordered categorical features, not ordered ones. The upper bound should be the arity (i.e., max value) of the feature.
    93953f1 [Joseph K. Bradley] Likely done with Python API.
    6df89a9 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    4562c08 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    665ba78 [Joseph K. Bradley] Small updates towards Python DecisionTree API
    188cb0d [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    6622247 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    b8fac57 [Joseph K. Bradley] Finished Python DecisionTree API and example but need to test a bit more.
    2b20c61 [Joseph K. Bradley] Small doc and style updates
    1b29c13 [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    584449a [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    dab0b67 [Joseph K. Bradley] Added documentation for DecisionTree internals
    8bb8aa0 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-bugfix
    978cfcf [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-bugfix
    6eed482 [Joseph K. Bradley] In DecisionTree: Changed from using procedural syntax for functions returning Unit to explicitly writing Unit return type.
    376dca2 [Joseph K. Bradley] Updated meaning of maxDepth by 1 to fit scikit-learn and rpart. * In code, replaced usages of maxDepth <-- maxDepth + 1 * In params, replace settings of maxDepth <-- maxDepth - 1
    e06e423 [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    bab3f19 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    59750f8 [Joseph K. Bradley] * Updated Strategy to check numClassesForClassification only if algo=Classification. * Updates based on comments: ** DecisionTreeRunner *** Made dataFormat arg default to libsvm ** Small cleanups ** tree.Node: Made recursive helper methods private, and renamed them.
    52e17c5 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-bugfix
    f5a036c [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    da50db7 [Joseph K. Bradley] Added one more test to DecisionTreeSuite: stump with 2 continuous variables for binary classification.  Caused problems in past, but fixed now.
    8e227ea [Joseph K. Bradley] Changed Strategy so it only requires numClassesForClassification >= 2 for classification
    cd1d933 [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    8ea8750 [Joseph K. Bradley] Bug fix: Off-by-1 when finding thresholds for splits for continuous features.
    8a758db [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    5fe44ed [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    2283df8 [Joseph K. Bradley] 2 bug fixes.
    73fbea2 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-bugfix
    5f920a1 [Joseph K. Bradley] Demonstration of bug before submitting fix: Updated DecisionTreeSuite so that 3 tests fail.  Will describe bug in next commit.
    f825352 [Joseph K. Bradley] Wrote Python API and example for DecisionTree.  Also added toString, depth, and numNodes methods to DecisionTreeModel.
    3f67382e
    History
    [SPARK-2478] [mllib] DecisionTree Python API
    Joseph K. Bradley authored
    Added experimental Python API for Decision Trees.
    
    API:
    * class DecisionTreeModel
    ** predict() for single examples and RDDs, taking both feature vectors and LabeledPoints
    ** numNodes()
    ** depth()
    ** __str__()
    * class DecisionTree
    ** trainClassifier()
    ** trainRegressor()
    ** train()
    
    Examples and testing:
    * Added example testing classification and regression with batch prediction: examples/src/main/python/mllib/tree.py
    * Have also tested example usage in doc of python/pyspark/mllib/tree.py which tests single-example prediction with dense and sparse vectors
    
    Also: Small bug fix in python/pyspark/mllib/_common.py: In _linear_predictor_typecheck, changed check for RDD to use isinstance() instead of type() in order to catch RDD subclasses.
    
    CC mengxr manishamde
    
    Author: Joseph K. Bradley <joseph.kurata.bradley@gmail.com>
    
    Closes #1727 from jkbradley/decisiontree-python-new and squashes the following commits:
    
    3744488 [Joseph K. Bradley] Renamed test tree.py to decision_tree_runner.py Small updates based on github review.
    6b86a9d [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    affceb9 [Joseph K. Bradley] * Fixed bug in doc tests in pyspark/mllib/util.py caused by change in loadLibSVMFile behavior.  (It used to threshold labels at 0 to make them 0/1, but it now leaves them as they are.) * Fixed small bug in loadLibSVMFile: If a data file had no features, then loadLibSVMFile would create a single all-zero feature.
    67a29bc [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    cf46ad7 [Joseph K. Bradley] Python DecisionTreeModel * predict(empty RDD) returns an empty RDD instead of an error. * Removed support for calling predict() on LabeledPoint and RDD[LabeledPoint] * predict() does not cache serialized RDD any more.
    aa29873 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    bf21be4 [Joseph K. Bradley] removed old run() func from DecisionTree
    fa10ea7 [Joseph K. Bradley] Small style update
    7968692 [Joseph K. Bradley] small braces typo fix
    e34c263 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    4801b40 [Joseph K. Bradley] Small style update to DecisionTreeSuite
    db0eab2 [Joseph K. Bradley] Merge branch 'decisiontree-bugfix2' into decisiontree-python-new
    6873fa9 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    225822f [Joseph K. Bradley] Bug: In DecisionTree, the method sequentialBinSearchForOrderedCategoricalFeatureInClassification() indexed bins from 0 to (math.pow(2, featureCategories.toInt - 1) - 1). This upper bound is the bound for unordered categorical features, not ordered ones. The upper bound should be the arity (i.e., max value) of the feature.
    93953f1 [Joseph K. Bradley] Likely done with Python API.
    6df89a9 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    4562c08 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    665ba78 [Joseph K. Bradley] Small updates towards Python DecisionTree API
    188cb0d [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    6622247 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    b8fac57 [Joseph K. Bradley] Finished Python DecisionTree API and example but need to test a bit more.
    2b20c61 [Joseph K. Bradley] Small doc and style updates
    1b29c13 [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    584449a [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    dab0b67 [Joseph K. Bradley] Added documentation for DecisionTree internals
    8bb8aa0 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-bugfix
    978cfcf [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-bugfix
    6eed482 [Joseph K. Bradley] In DecisionTree: Changed from using procedural syntax for functions returning Unit to explicitly writing Unit return type.
    376dca2 [Joseph K. Bradley] Updated meaning of maxDepth by 1 to fit scikit-learn and rpart. * In code, replaced usages of maxDepth <-- maxDepth + 1 * In params, replace settings of maxDepth <-- maxDepth - 1
    e06e423 [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    bab3f19 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    59750f8 [Joseph K. Bradley] * Updated Strategy to check numClassesForClassification only if algo=Classification. * Updates based on comments: ** DecisionTreeRunner *** Made dataFormat arg default to libsvm ** Small cleanups ** tree.Node: Made recursive helper methods private, and renamed them.
    52e17c5 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-bugfix
    f5a036c [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    da50db7 [Joseph K. Bradley] Added one more test to DecisionTreeSuite: stump with 2 continuous variables for binary classification.  Caused problems in past, but fixed now.
    8e227ea [Joseph K. Bradley] Changed Strategy so it only requires numClassesForClassification >= 2 for classification
    cd1d933 [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    8ea8750 [Joseph K. Bradley] Bug fix: Off-by-1 when finding thresholds for splits for continuous features.
    8a758db [Joseph K. Bradley] Merge branch 'decisiontree-bugfix' into decisiontree-python-new
    5fe44ed [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
    2283df8 [Joseph K. Bradley] 2 bug fixes.
    73fbea2 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into decisiontree-bugfix
    5f920a1 [Joseph K. Bradley] Demonstration of bug before submitting fix: Updated DecisionTreeSuite so that 3 tests fail.  Will describe bug in next commit.
    f825352 [Joseph K. Bradley] Wrote Python API and example for DecisionTree.  Also added toString, depth, and numNodes methods to DecisionTreeModel.
util.py 8.06 KiB
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy as np
import warnings

from pyspark.mllib.linalg import Vectors, SparseVector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point
from pyspark.rdd import RDD
from pyspark.serializers import NoOpSerializer


class MLUtils:
    """
    Helper methods to load, save and pre-process data used in MLlib.
    """

    @staticmethod
    def _parse_libsvm_line(line, multiclass):
        warnings.warn("deprecated", DeprecationWarning)
        return _parse_libsvm_line(line)

    @staticmethod
    def _parse_libsvm_line(line):
        """
        Parses a line in LIBSVM format into (label, indices, values).
        """
        items = line.split(None)
        label = float(items[0])
        nnz = len(items) - 1
        indices = np.zeros(nnz, dtype=np.int32)
        values = np.zeros(nnz)
        for i in xrange(nnz):
            index, value = items[1 + i].split(":")
            indices[i] = int(index) - 1
            values[i] = float(value)
        return label, indices, values

    @staticmethod
    def _convert_labeled_point_to_libsvm(p):
        """Converts a LabeledPoint to a string in LIBSVM format."""
        items = [str(p.label)]
        v = _convert_vector(p.features)
        if type(v) == np.ndarray:
            for i in xrange(len(v)):
                items.append(str(i + 1) + ":" + str(v[i]))
        elif type(v) == SparseVector:
            nnz = len(v.indices)
            for i in xrange(nnz):
                items.append(str(v.indices[i] + 1) + ":" + str(v.values[i]))
        else:
            raise TypeError("_convert_labeled_point_to_libsvm needs either ndarray or SparseVector"
                            " but got " % type(v))
        return " ".join(items)

    @staticmethod
    def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=None):
        warnings.warn("deprecated", DeprecationWarning)
        return loadLibSVMFile(sc, path, numFeatures, minPartitions)

    @staticmethod
    def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None):
        """
        Loads labeled data in the LIBSVM format into an RDD of
        LabeledPoint. The LIBSVM format is a text-based format used by
        LIBSVM and LIBLINEAR. Each line represents a labeled sparse
        feature vector using the following format:

        label index1:value1 index2:value2 ...

        where the indices are one-based and in ascending order. This
        method parses each line into a LabeledPoint, where the feature
        indices are converted to zero-based.

        @param sc: Spark context
        @param path: file or directory path in any Hadoop-supported file
                     system URI
        @param numFeatures: number of features, which will be determined
                            from the input data if a nonpositive value
                            is given. This is useful when the dataset is
                            already split into multiple files and you
                            want to load them separately, because some
                            features may not present in certain files,
                            which leads to inconsistent feature
                            dimensions.
        @param minPartitions: min number of partitions
        @return: labeled data stored as an RDD of LabeledPoint

        >>> from tempfile import NamedTemporaryFile
        >>> from pyspark.mllib.util import MLUtils
        >>> tempFile = NamedTemporaryFile(delete=True)
        >>> tempFile.write("+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0")
        >>> tempFile.flush()
        >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
        >>> tempFile.close()
        >>> type(examples[0]) == LabeledPoint
        True
        >>> print examples[0]
        (1.0,(6,[0,2,4],[1.0,2.0,3.0]))
        >>> type(examples[1]) == LabeledPoint
        True
        >>> print examples[1]
        (-1.0,(6,[],[]))
        >>> type(examples[2]) == LabeledPoint
        True
        >>> print examples[2]
        (-1.0,(6,[1,3,5],[4.0,5.0,6.0]))
        """

        lines = sc.textFile(path, minPartitions)
        parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l))
        if numFeatures <= 0:
            parsed.cache()
            numFeatures = parsed.map(lambda x: -1 if x[1].size == 0 else x[1][-1]).reduce(max) + 1
        return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2])))

    @staticmethod
    def saveAsLibSVMFile(data, dir):
        """
        Save labeled data in LIBSVM format.

        @param data: an RDD of LabeledPoint to be saved
        @param dir: directory to save the data

        >>> from tempfile import NamedTemporaryFile
        >>> from fileinput import input
        >>> from glob import glob
        >>> from pyspark.mllib.util import MLUtils
        >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \
                        LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
        >>> tempFile = NamedTemporaryFile(delete=True)
        >>> tempFile.close()
        >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name)
        >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
        '0.0 1:1.01 2:2.02 3:3.03\\n1.1 1:1.23 3:4.56\\n'
        """
        lines = data.map(lambda p: MLUtils._convert_labeled_point_to_libsvm(p))
        lines.saveAsTextFile(dir)

    @staticmethod
    def loadLabeledPoints(sc, path, minPartitions=None):
        """
        Load labeled points saved using RDD.saveAsTextFile.

        @param sc: Spark context
        @param path: file or directory path in any Hadoop-supported file
                     system URI
        @param minPartitions: min number of partitions
        @return: labeled data stored as an RDD of LabeledPoint

        >>> from tempfile import NamedTemporaryFile
        >>> from pyspark.mllib.util import MLUtils
        >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \
                        LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
        >>> tempFile = NamedTemporaryFile(delete=True)
        >>> tempFile.close()
        >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
        >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
        >>> type(loaded[0]) == LabeledPoint
        True
        >>> print examples[0]
        (1.1,(3,[0,2],[-1.23,4.56e-07]))
        >>> type(examples[1]) == LabeledPoint
        True
        >>> print examples[1]
        (0.0,[1.01,2.02,3.03])
        """
        minPartitions = minPartitions or min(sc.defaultParallelism, 2)
        jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
        serialized = RDD(jSerialized, sc, NoOpSerializer())
        return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes)))


def _test():
    import doctest
    from pyspark.context import SparkContext
    globs = globals().copy()
    # The small batch size here ensures that we see multiple batches,
    # even in these small test examples:
    globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
    (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
    globs['sc'].stop()
    if failure_count:
        exit(-1)


if __name__ == "__main__":
    _test()