Skip to content
Snippets Groups Projects
Commit fae095bc authored by Davies Liu's avatar Davies Liu Committed by Xiangrui Meng
Browse files

[SPARK-3961] [MLlib] [PySpark] Python API for mllib.feature

Added completed Python API for MLlib.feature

Normalizer
StandardScalerModel
StandardScaler
HashTF
IDFModel
IDF

cc mengxr

Author: Davies Liu <davies@databricks.com>
Author: Davies Liu <davies.liu@gmail.com>

Closes #2819 from davies/feature and squashes the following commits:

4f48f48 [Davies Liu] add a note for HashingTF
67f6d21 [Davies Liu] address comments
b628693 [Davies Liu] rollback changes in Word2Vec
efb4f4f [Davies Liu] Merge branch 'master' into feature
806c7c2 [Davies Liu] address comments
3abb8c2 [Davies Liu] address comments
59781b9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into feature
a405ae7 [Davies Liu] fix tests
7a1891a [Davies Liu] fix tests
486795f [Davies Liu] update programming guide, HashTF -> HashingTF
8a50584 [Davies Liu] Python API for mllib.feature
parent 46c63417
No related branches found
No related tags found
No related merge requests found
......@@ -95,8 +95,49 @@ tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
TF and IDF are implemented in [HashingTF](api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF)
and [IDF](api/python/pyspark.mllib.html#pyspark.mllib.feature.IDF).
`HashingTF` takes an RDD of list as the input.
Each record could be an iterable of strings or other types.
{% highlight python %}
from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF
sc = SparkContext()
# Load documents (one per line).
documents = sc.textFile("...").map(lambda line: line.split(" "))
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
{% endhighlight %}
While applying `HashingTF` only needs a single pass to the data, applying `IDF` needs two passes:
first to compute the IDF vector and second to scale the term frequencies by IDF.
{% highlight python %}
from pyspark.mllib.feature import IDF
# ... continue from the previous example
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
{% endhighlight %}
MLLib's IDF implementation provides an option for ignoring terms which occur in less than a
minimum number of documents. In such cases, the IDF for these terms is set to 0. This feature
can be used by passing the `minDocFreq` value to the IDF constructor.
{% highlight python %}
# ... continue from the previous example
tf.cache()
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)
{% endhighlight %}
</div>
</div>
......@@ -223,6 +264,29 @@ val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
{% endhighlight %}
</div>
<div data-lang="python">
{% highlight python %}
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))
# Without converting the features into dense vectors, transformation with zero mean will raise
# exception on sparse vector.
# data2 will be unit variance and zero mean.
data2 = label.zip(scaler1.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
{% endhighlight %}
</div>
</div>
## Normalizer
......@@ -267,4 +331,25 @@ val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
{% endhighlight %}
</div>
<div data-lang="python">
{% highlight python %}
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import Normalizer
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))
# Each sample in data1 will be normalized using $L^2$ norm.
data1 = labels.zip(normalizer1.transform(features))
# Each sample in data2 will be normalized using $L^\infty$ norm.
data2 = labels.zip(normalizer2.transform(features))
{% endhighlight %}
</div>
</div>
......@@ -31,8 +31,7 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.feature.Word2Vec
import org.apache.spark.mllib.feature.Word2VecModel
import org.apache.spark.mllib.feature._
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.random.{RandomRDDs => RG}
......@@ -291,6 +290,43 @@ class PythonMLLibAPI extends Serializable {
ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha)
}
/**
* Java stub for Normalizer.transform()
*/
def normalizeVector(p: Double, vector: Vector): Vector = {
new Normalizer(p).transform(vector)
}
/**
* Java stub for Normalizer.transform()
*/
def normalizeVector(p: Double, rdd: JavaRDD[Vector]): JavaRDD[Vector] = {
new Normalizer(p).transform(rdd)
}
/**
* Java stub for IDF.fit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
* Extra care needs to be taken in the Python code to ensure it gets freed on
* exit; see the Py4J documentation.
*/
def fitStandardScaler(
withMean: Boolean,
withStd: Boolean,
data: JavaRDD[Vector]): StandardScalerModel = {
new StandardScaler(withMean, withStd).fit(data.rdd)
}
/**
* Java stub for IDF.fit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
* Extra care needs to be taken in the Python code to ensure it gets freed on
* exit; see the Py4J documentation.
*/
def fitIDF(minDocFreq: Int, dataset: JavaRDD[Vector]): IDFModel = {
new IDF(minDocFreq).fit(dataset)
}
/**
* Java stub for Python mllib Word2Vec fit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
......@@ -328,6 +364,15 @@ class PythonMLLibAPI extends Serializable {
model.transform(word)
}
/**
* Transforms an RDD of words to its vector representation
* @param rdd an RDD of words
* @return an RDD of vector representations of words
*/
def transform(rdd: JavaRDD[String]): JavaRDD[Vector] = {
rdd.rdd.map(model.transform)
}
def findSynonyms(word: String, num: Int): java.util.List[java.lang.Object] = {
val vec = transform(word)
findSynonyms(vec, num)
......
......@@ -18,6 +18,7 @@
package org.apache.spark.mllib.feature
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
......@@ -48,4 +49,14 @@ trait VectorTransformer extends Serializable {
data.map(x => this.transform(x))
}
/**
* Applies transformation on an JavaRDD[Vector].
*
* @param data JavaRDD[Vector] to be transformed.
* @return transformed JavaRDD[Vector].
*/
def transform(data: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(data.rdd)
}
}
......@@ -432,7 +432,7 @@ class Word2VecModel private[mllib] (
throw new IllegalStateException(s"$word not in vocabulary")
}
}
/**
* Find synonyms of a word
* @param word a word
......@@ -443,7 +443,7 @@ class Word2VecModel private[mllib] (
val vector = transform(word)
findSynonyms(vector,num)
}
/**
* Find synonyms of the vector representation of a word
* @param vector vector representation of a word
......
......@@ -18,59 +18,357 @@
"""
Python package for feature in MLlib.
"""
import sys
import warnings
import py4j.protocol
from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaObject
from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel']
# Hack for support float('inf') in Py4j
_old_smart_decode = py4j.protocol.smart_decode
_float_str_mapping = {
u'nan': u'NaN',
u'inf': u'Infinity',
u'-inf': u'-Infinity',
}
def _new_smart_decode(obj):
if isinstance(obj, float):
s = unicode(obj)
return _float_str_mapping.get(s, s)
return _old_smart_decode(obj)
py4j.protocol.smart_decode = _new_smart_decode
# TODO: move these helper functions into utils
_picklable_classes = [
'LinkedList',
'SparseVector',
'DenseVector',
'DenseMatrix',
'Rating',
'LabeledPoint',
]
def _py2java(sc, a):
""" Convert Python object into Java """
if isinstance(a, RDD):
a = _to_java_object_rdd(a)
elif not isinstance(a, (int, long, float, bool, basestring)):
bytes = bytearray(PickleSerializer().dumps(a))
a = sc._jvm.SerDe.loads(bytes)
return a
def _java2py(sc, r):
if isinstance(r, JavaObject):
clsName = r.getClass().getSimpleName()
if clsName in ("RDD", "JavaRDD"):
if clsName == "RDD":
r = r.toJavaRDD()
jrdd = sc._jvm.SerDe.javaToPython(r)
return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
__all__ = ['Word2Vec', 'Word2VecModel']
elif clsName in _picklable_classes:
r = sc._jvm.SerDe.dumps(r)
if isinstance(r, bytearray):
r = PickleSerializer().loads(str(r))
return r
class Word2VecModel(object):
def _callJavaFunc(sc, func, *args):
""" Call Java Function
"""
class for Word2Vec model
args = [_py2java(sc, a) for a in args]
return _java2py(sc, func(*args))
def _callAPI(sc, name, *args):
""" Call API in PythonMLLibAPI
"""
def __init__(self, sc, java_model):
api = getattr(sc._jvm.PythonMLLibAPI(), name)
return _callJavaFunc(sc, api, *args)
class VectorTransformer(object):
"""
:: DeveloperApi ::
Base class for transformation of a vector or RDD of vector
"""
def transform(self, vector):
"""
Applies transformation on a vector.
:param vector: vector to be transformed.
"""
raise NotImplementedError
class Normalizer(VectorTransformer):
"""
:: Experimental ::
Normalizes samples individually to unit L\ :sup:`p`\ norm
For any 1 <= `p` <= float('inf'), normalizes samples using
sum(abs(vector). :sup:`p`) :sup:`(1/p)` as norm.
For `p` = float('inf'), max(abs(vector)) will be used as norm for normalization.
>>> v = Vectors.dense(range(3))
>>> nor = Normalizer(1)
>>> nor.transform(v)
DenseVector([0.0, 0.3333, 0.6667])
>>> rdd = sc.parallelize([v])
>>> nor.transform(rdd).collect()
[DenseVector([0.0, 0.3333, 0.6667])]
>>> nor2 = Normalizer(float("inf"))
>>> nor2.transform(v)
DenseVector([0.0, 0.5, 1.0])
"""
def __init__(self, p=2.0):
"""
:param sc: Spark context
:param java_model: Handle to Java model object
:param p: Normalization in L^p^ space, p = 2 by default.
"""
assert p >= 1.0, "p should be greater than 1.0"
self.p = float(p)
def transform(self, vector):
"""
Applies unit length normalization on a vector.
:param vector: vector to be normalized.
:return: normalized vector. If the norm of the input is zero, it
will return the input vector.
"""
sc = SparkContext._active_spark_context
assert sc is not None, "SparkContext should be initialized first"
return _callAPI(sc, "normalizeVector", self.p, vector)
class JavaModelWrapper(VectorTransformer):
"""
Wrapper for the model in JVM
"""
def __init__(self, sc, java_model):
self._sc = sc
self._java_model = java_model
def __del__(self):
self._sc._gateway.detach(self._java_model)
def transform(self, word):
def transform(self, dataset):
return _callJavaFunc(self._sc, self._java_model.transform, dataset)
class StandardScalerModel(JavaModelWrapper):
"""
:: Experimental ::
Represents a StandardScaler model that can transform vectors.
"""
def transform(self, vector):
"""
:param word: a word
:return: vector representation of word
Applies standardization transformation on a vector.
:param vector: Vector to be standardized.
:return: Standardized vector. If the variance of a column is zero,
it will return default `0.0` for the column with zero variance.
"""
return JavaModelWrapper.transform(self, vector)
class StandardScaler(object):
"""
:: Experimental ::
Standardizes features by removing the mean and scaling to unit
variance using column summary statistics on the samples in the
training set.
>>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 1.9])]
>>> dataset = sc.parallelize(vs)
>>> standardizer = StandardScaler(True, True)
>>> model = standardizer.fit(dataset)
>>> result = model.transform(dataset)
>>> for r in result.collect(): r
DenseVector([-0.7071, 0.7071, -0.7071])
DenseVector([0.7071, -0.7071, 0.7071])
"""
def __init__(self, withMean=False, withStd=True):
"""
:param withMean: False by default. Centers the data with mean
before scaling. It will build a dense output, so this
does not work on sparse input and will raise an exception.
:param withStd: True by default. Scales the data to unit standard
deviation.
"""
if not (withMean or withStd):
warnings.warn("Both withMean and withStd are false. The model does nothing.")
self.withMean = withMean
self.withStd = withStd
def fit(self, dataset):
"""
Computes the mean and variance and stores as a model to be used for later scaling.
:param data: The data used to compute the mean and variance to build
the transformation model.
:return: a StandardScalarModel
"""
sc = dataset.context
jmodel = _callAPI(sc, "fitStandardScaler", self.withMean, self.withStd, dataset)
return StandardScalerModel(sc, jmodel)
class HashingTF(object):
"""
:: Experimental ::
Maps a sequence of terms to their term frequencies using the hashing trick.
Note: the terms must be hashable (can not be dict/set/list...).
>>> htf = HashingTF(100)
>>> doc = "a a b b c d".split(" ")
>>> htf.transform(doc)
SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0})
"""
def __init__(self, numFeatures=1 << 20):
"""
:param numFeatures: number of features (default: 2^20)
"""
self.numFeatures = numFeatures
def indexOf(self, term):
""" Returns the index of the input term. """
return hash(term) % self.numFeatures
def transform(self, document):
"""
Transforms the input document (list of terms) to term frequency vectors,
or transform the RDD of document to RDD of term frequency vectors.
"""
if isinstance(document, RDD):
return document.map(self.transform)
freq = {}
for term in document:
i = self.indexOf(term)
freq[i] = freq.get(i, 0) + 1.0
return Vectors.sparse(self.numFeatures, freq.items())
class IDFModel(JavaModelWrapper):
"""
Represents an IDF model that can transform term frequency vectors.
"""
def transform(self, dataset):
"""
Transforms term frequency (TF) vectors to TF-IDF vectors.
If `minDocFreq` was set for the IDF calculation,
the terms which occur in fewer than `minDocFreq`
documents will have an entry of 0.
:param dataset: an RDD of term frequency vectors
:return: an RDD of TF-IDF vectors
"""
return JavaModelWrapper.transform(self, dataset)
class IDF(object):
"""
:: Experimental ::
Inverse document frequency (IDF).
The standard formulation is used: `idf = log((m + 1) / (d(t) + 1))`,
where `m` is the total number of documents and `d(t)` is the number
of documents that contain term `t`.
This implementation supports filtering out terms which do not appear
in a minimum number of documents (controlled by the variable `minDocFreq`).
For terms that are not in at least `minDocFreq` documents, the IDF is
found as 0, resulting in TF-IDFs of 0.
>>> n = 4
>>> freqs = [Vectors.sparse(n, (1, 3), (1.0, 2.0)),
... Vectors.dense([0.0, 1.0, 2.0, 3.0]),
... Vectors.sparse(n, [1], [1.0])]
>>> data = sc.parallelize(freqs)
>>> idf = IDF()
>>> model = idf.fit(data)
>>> tfidf = model.transform(data)
>>> for r in tfidf.collect(): r
SparseVector(4, {1: 0.0, 3: 0.5754})
DenseVector([0.0, 0.0, 1.3863, 0.863])
SparseVector(4, {1: 0.0})
"""
def __init__(self, minDocFreq=0):
"""
:param minDocFreq: minimum of documents in which a term
should appear for filtering
"""
self.minDocFreq = minDocFreq
def fit(self, dataset):
"""
Computes the inverse document frequency.
:param dataset: an RDD of term frequency vectors
"""
sc = dataset.context
jmodel = _callAPI(sc, "fitIDF", self.minDocFreq, dataset)
return IDFModel(sc, jmodel)
class Word2VecModel(JavaModelWrapper):
"""
class for Word2Vec model
"""
def transform(self, word):
"""
Transforms a word to its vector representation
Note: local use only
:param word: a word
:return: vector representation of word(s)
"""
# TODO: make transform usable in RDD operations from python side
result = self._java_model.transform(word)
return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(result)))
try:
return _callJavaFunc(self._sc, self._java_model.transform, word)
except Py4JJavaError:
raise ValueError("%s not found" % word)
def findSynonyms(self, x, num):
def findSynonyms(self, word, num):
"""
:param x: a word or a vector representation of word
Find synonyms of a word
:param word: a word or a vector representation of word
:param num: number of synonyms to find
:return: array of (word, cosineSimilarity)
Find synonyms of a word
Note: local use only
"""
# TODO: make findSynonyms usable in RDD operations from python side
ser = PickleSerializer()
if type(x) == str:
jlist = self._java_model.findSynonyms(x, num)
else:
bytes = bytearray(ser.dumps(_convert_to_vector(x)))
vec = self._sc._jvm.SerDe.loads(bytes)
jlist = self._java_model.findSynonyms(vec, num)
words, similarity = ser.loads(str(self._sc._jvm.SerDe.dumps(jlist)))
words, similarity = _callJavaFunc(self._sc, self._java_model.findSynonyms, word, num)
return zip(words, similarity)
......@@ -85,6 +383,7 @@ class Word2Vec(object):
We used skip-gram model in our implementation and hierarchical softmax
method to train the model. The variable names in the implementation
matches the original C implementation.
For original C implementation, see https://code.google.com/p/word2vec/
For research papers, see
Efficient Estimation of Word Representations in Vector Space
......@@ -95,33 +394,26 @@ class Word2Vec(object):
>>> localDoc = [sentence, sentence]
>>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
>>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc)
>>> syms = model.findSynonyms("a", 2)
>>> str(syms[0][0])
'b'
>>> str(syms[1][0])
'c'
>>> len(syms)
2
>>> [s[0] for s in syms]
[u'b', u'c']
>>> vec = model.transform("a")
>>> len(vec)
10
>>> syms = model.findSynonyms(vec, 2)
>>> str(syms[0][0])
'b'
>>> str(syms[1][0])
'c'
>>> len(syms)
2
>>> [s[0] for s in syms]
[u'b', u'c']
"""
def __init__(self):
"""
Construct Word2Vec instance
"""
import random # this can't be on the top because of mllib.random
self.vectorSize = 100
self.learningRate = 0.025
self.numPartitions = 1
self.numIterations = 1
self.seed = 42L
self.seed = random.randint(0, sys.maxint)
def setVectorSize(self, vectorSize):
"""
......@@ -164,20 +456,13 @@ class Word2Vec(object):
Computes the vector representation of each word in vocabulary.
:param data: training data. RDD of subtype of Iterable[String]
:return: python Word2VecModel instance
:return: Word2VecModel instance
"""
sc = data.context
ser = PickleSerializer()
vectorSize = self.vectorSize
learningRate = self.learningRate
numPartitions = self.numPartitions
numIterations = self.numIterations
seed = self.seed
model = sc._jvm.PythonMLLibAPI().trainWord2Vec(
_to_java_object_rdd(data), vectorSize,
learningRate, numPartitions, numIterations, seed)
return Word2VecModel(sc, model)
jmodel = _callAPI(sc, "trainWord2Vec", data, int(self.vectorSize),
float(self.learningRate), int(self.numPartitions),
int(self.numIterations), long(self.seed))
return Word2VecModel(sc, jmodel)
def _test():
......@@ -191,4 +476,8 @@ def _test():
exit(-1)
if __name__ == "__main__":
# remove current path from list of search paths to avoid importing mllib.random
# for C{import random}, which is done in an external dependency of pyspark during doctests.
import sys
sys.path.pop(0)
_test()
......@@ -111,6 +111,13 @@ def _vector_size(v):
raise TypeError("Cannot treat type %s as a vector" % type(v))
def _format_float(f, digits=4):
s = str(round(f, digits))
if '.' in s:
s = s[:s.index('.') + 1 + digits]
return s
class Vector(object):
"""
Abstract class for DenseVector and SparseVector
......@@ -228,7 +235,7 @@ class DenseVector(Vector):
return "[" + ",".join([str(v) for v in self.array]) + "]"
def __repr__(self):
return "DenseVector(%r)" % self.array
return "DenseVector([%s])" % (', '.join(_format_float(i) for i in self.array))
def __eq__(self, other):
return isinstance(other, DenseVector) and self.array == other.array
......@@ -416,7 +423,7 @@ class SparseVector(Vector):
Returns a copy of this SparseVector as a 1-dimensional NumPy array.
"""
arr = np.zeros((self.size,), dtype=np.float64)
for i in xrange(self.indices.size):
for i in xrange(len(self.indices)):
arr[self.indices[i]] = self.values[i]
return arr
......@@ -431,7 +438,8 @@ class SparseVector(Vector):
def __repr__(self):
inds = self.indices
vals = self.values
entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
entries = ", ".join(["{0}: {1}".format(inds[i], _format_float(vals[i]))
for i in xrange(len(inds))])
return "SparseVector({0}, {{{1}}})".format(self.size, entries)
def __eq__(self, other):
......@@ -491,7 +499,7 @@ class Vectors(object):
returns a NumPy array.
>>> Vectors.dense([1, 2, 3])
DenseVector(array('d', [1.0, 2.0, 3.0]))
DenseVector([1.0, 2.0, 3.0])
"""
return DenseVector(elements)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment