-
Xusen Yin authored
https://issues.apache.org/jira/browse/SPARK-11963 Author: Xusen Yin <yinxusen@gmail.com> Closes #9962 from yinxusen/SPARK-11963.
Xusen Yin authoredhttps://issues.apache.org/jira/browse/SPARK-11963 Author: Xusen Yin <yinxusen@gmail.com> Closes #9962 from yinxusen/SPARK-11963.
layout: global
title: Feature Extraction, Transformation, and Selection - SparkML
displayTitle: <a href="ml-guide.html">ML</a> - Features
This section covers algorithms for working with features, roughly divided into these groups:
- Extraction: Extracting features from "raw" data
- Transformation: Scaling, converting, or modifying features
- Selection: Selecting a subset from a larger set of features
Table of Contents
- This will become a table of contents (this text will be scraped). {:toc}
Feature Extractors
TF-IDF (HashingTF and IDF)
Term Frequency-Inverse Document Frequency (TF-IDF) is a common text pre-processing step. In Spark ML, TF-IDF is separate into two parts: TF (+hashing) and IDF.
TF: HashingTF
is a Transformer
which takes sets of terms and converts those sets into fixed-length feature vectors. In text processing, a "set of terms" might be a bag of words.
The algorithm combines Term Frequency (TF) counts with the hashing trick for dimensionality reduction.
IDF: IDF
is an Estimator
which fits on a dataset and produces an IDFModel
. The IDFModel
takes feature vectors (generally created from HashingTF
) and scales each column. Intuitively, it down-weights columns which appear frequently in a corpus.
Please refer to the MLlib user guide on TF-IDF for more details on Term Frequency and Inverse Document Frequency.
In the following code segment, we start with a set of sentences. We split each sentence into words using Tokenizer
. For each sentence (bag of words), we use HashingTF
to hash the sentence into a feature vector. We use IDF
to rescale the feature vectors; this generally improves performance when using text as features. Our feature vectors could then be passed to a learning algorithm.
Refer to the HashingTF Scala docs and the IDF Scala docs for more details on the API.
{% include_example scala/org/apache/spark/examples/ml/TfIdfExample.scala %}
Refer to the HashingTF Java docs and the IDF Java docs for more details on the API.
{% include_example java/org/apache/spark/examples/ml/JavaTfIdfExample.java %}
Refer to the HashingTF Python docs and the IDF Python docs for more details on the API.
{% include_example python/ml/tf_idf_example.py %}
Word2Vec
Word2Vec
is an Estimator
which takes sequences of words representing documents and trains a
Word2VecModel
. The model maps each word to a unique fixed-size vector. The Word2VecModel
transforms each document into a vector using the average of all words in the document; this vector
can then be used for as features for prediction, document similarity calculations, etc.
Please refer to the MLlib user guide on Word2Vec for more
details.
In the following code segment, we start with a set of documents, each of which is represented as a sequence of words. For each document, we transform it into a feature vector. This feature vector could then be passed to a learning algorithm.
Refer to the Word2Vec Scala docs for more details on the API.
{% include_example scala/org/apache/spark/examples/ml/Word2VecExample.scala %}
Refer to the Word2Vec Java docs for more details on the API.
{% include_example java/org/apache/spark/examples/ml/JavaWord2VecExample.java %}
Refer to the Word2Vec Python docs for more details on the API.
{% include_example python/ml/word2vec_example.py %}
CountVectorizer
CountVectorizer
and CountVectorizerModel
aim to help convert a collection of text documents
to vectors of token counts. When an a-priori dictionary is not available, CountVectorizer
can
be used as an Estimator
to extract the vocabulary and generates a CountVectorizerModel
. The
model produces sparse representations for the documents over the vocabulary, which can then be
passed to other algorithms like LDA.
During the fitting process, CountVectorizer
will select the top vocabSize
words ordered by
term frequency across the corpus. An optional parameter "minDF" also affect the fitting process
by specifying the minimum number (or fraction if < 1.0) of documents a term must appear in to be
included in the vocabulary.
Examples
Assume that we have the following DataFrame with columns id
and texts
:
id | texts
----|----------
0 | Array("a", "b", "c")
1 | Array("a", "b", "b", "c", "a")
each row intexts
is a document of type Array[String].
Invoking fit of CountVectorizer
produces a CountVectorizerModel
with vocabulary (a, b, c),
then the output column "vector" after transformation contains:
id | texts | vector
----|---------------------------------|---------------
0 | Array("a", "b", "c") | (3,[0,1,2],[1.0,1.0,1.0])
1 | Array("a", "b", "b", "c", "a") | (3,[0,1,2],[2.0,2.0,1.0])
each vector represents the token counts of the document over the vocabulary.
Refer to the CountVectorizer Scala docs and the CountVectorizerModel Scala docs for more details on the API.
{% include_example scala/org/apache/spark/examples/ml/CountVectorizerExample.scala %}
Refer to the CountVectorizer Java docs and the CountVectorizerModel Java docs for more details on the API.
{% include_example java/org/apache/spark/examples/ml/JavaCountVectorizerExample.java %}
Feature Transformers
Tokenizer
Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). A simple Tokenizer class provides this functionality. The example below shows how to split sentences into sequences of words.
RegexTokenizer allows more advanced tokenization based on regular expression (regex) matching. By default, the parameter "pattern" (regex, default: \s+) is used as delimiters to split the input text. Alternatively, users can set parameter "gaps" to false indicating the regex "pattern" denotes "tokens" rather than splitting gaps, and find all matching occurrences as the tokenization result.
Refer to the Tokenizer Scala docs and the RegexTokenizer Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.{Tokenizer, RegexTokenizer}
val sentenceDataFrame = sqlContext.createDataFrame(Seq( (0, "Hi I heard about Spark"), (1, "I wish Java could use case classes"), (2, "Logistic,regression,models,are,neat") )).toDF("label", "sentence") val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") val regexTokenizer = new RegexTokenizer() .setInputCol("sentence") .setOutputCol("words") .setPattern("\W") // alternatively .setPattern("\w+").setGaps(false)
val tokenized = tokenizer.transform(sentenceDataFrame) tokenized.select("words", "label").take(3).foreach(println) val regexTokenized = regexTokenizer.transform(sentenceDataFrame) regexTokenized.select("words", "label").take(3).foreach(println) {% endhighlight %}
Refer to the Tokenizer Java docs and the RegexTokenizer Java docs for more details on the API.
{% highlight java %} import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.ml.feature.RegexTokenizer; import org.apache.spark.ml.feature.Tokenizer; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
JavaRDD jrdd = jsc.parallelize(Arrays.asList( RowFactory.create(0, "Hi I heard about Spark"), RowFactory.create(1, "I wish Java could use case classes"), RowFactory.create(2, "Logistic,regression,models,are,neat") )); StructType schema = new StructType(new StructField[]{ new StructField("label", DataTypes.DoubleType, false, Metadata.empty()), new StructField("sentence", DataTypes.StringType, false, Metadata.empty()) }); DataFrame sentenceDataFrame = sqlContext.createDataFrame(jrdd, schema); Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words"); DataFrame wordsDataFrame = tokenizer.transform(sentenceDataFrame); for (Row r : wordsDataFrame.select("words", "label").take(3)) { java.util.List words = r.getList(0); for (String word : words) System.out.print(word + " "); System.out.println(); }
RegexTokenizer regexTokenizer = new RegexTokenizer() .setInputCol("sentence") .setOutputCol("words") .setPattern("\W"); // alternatively .setPattern("\w+").setGaps(false); {% endhighlight %}
Refer to the Tokenizer Python docs and the the RegexTokenizer Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import Tokenizer, RegexTokenizer
sentenceDataFrame = sqlContext.createDataFrame([ (0, "Hi I heard about Spark"), (1, "I wish Java could use case classes"), (2, "Logistic,regression,models,are,neat") ], ["label", "sentence"]) tokenizer = Tokenizer(inputCol="sentence", outputCol="words") wordsDataFrame = tokenizer.transform(sentenceDataFrame) for words_label in wordsDataFrame.select("words", "label").take(3): print(words_label) regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\W")
alternatively, pattern="\w+", gaps(False)
{% endhighlight %}
StopWordsRemover
Stop words are words which should be excluded from the input, typically because the words appear frequently and don't carry as much meaning.
StopWordsRemover
takes as input a sequence of strings (e.g. the output
of a Tokenizer) and drops all the stop
words from the input sequences. The list of stopwords is specified by
the stopWords
parameter. We provide a list of stop
words by
default, accessible by calling getStopWords
on a newly instantiated
StopWordsRemover
instance. A boolean parameter caseSensitive
indicates
if the matches should be case sensitive (false by default).
Examples
Assume that we have the following DataFrame with columns id
and raw
:
id | raw
----|----------
0 | [I, saw, the, red, baloon]
1 | [Mary, had, a, little, lamb]
Applying StopWordsRemover
with raw
as the input column and filtered
as the output
column, we should get the following:
id | raw | filtered
----|-----------------------------|--------------------
0 | [I, saw, the, red, baloon] | [saw, red, baloon]
1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]
In filtered
, the stop words "I", "the", "had", and "a" have been
filtered out.
Refer to the StopWordsRemover Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover() .setInputCol("raw") .setOutputCol("filtered") val dataSet = sqlContext.createDataFrame(Seq( (0, Seq("I", "saw", "the", "red", "baloon")), (1, Seq("Mary", "had", "a", "little", "lamb")) )).toDF("id", "raw")
remover.transform(dataSet).show() {% endhighlight %}
Refer to the StopWordsRemover Java docs for more details on the API.
{% highlight java %} import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.ml.feature.StopWordsRemover; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
StopWordsRemover remover = new StopWordsRemover() .setInputCol("raw") .setOutputCol("filtered");
JavaRDD rdd = jsc.parallelize(Arrays.asList( RowFactory.create(Arrays.asList("I", "saw", "the", "red", "baloon")), RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb")) )); StructType schema = new StructType(new StructField[] { new StructField("raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty()) }); DataFrame dataset = jsql.createDataFrame(rdd, schema);
remover.transform(dataset).show(); {% endhighlight %}
Refer to the StopWordsRemover Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import StopWordsRemover
sentenceData = sqlContext.createDataFrame([ (0, ["I", "saw", "the", "red", "baloon"]), (1, ["Mary", "had", "a", "little", "lamb"]) ], ["label", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered") remover.transform(sentenceData).show(truncate=False) {% endhighlight %}
n-gram
An n-gram is a sequence of n tokens (typically words) for some integer n. The NGram
class can be used to transform input features into n-grams.
NGram
takes as input a sequence of strings (e.g. the output of a Tokenizer). The parameter n
is used to determine the number of terms in each n-gram. The output will consist of a sequence of n-grams where each n-gram is represented by a space-delimited string of n consecutive words. If the input sequence contains fewer than n
strings, no output is produced.
Refer to the NGram Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.NGram
val wordDataFrame = sqlContext.createDataFrame(Seq( (0, Array("Hi", "I", "heard", "about", "Spark")), (1, Array("I", "wish", "Java", "could", "use", "case", "classes")), (2, Array("Logistic", "regression", "models", "are", "neat")) )).toDF("label", "words")
val ngram = new NGram().setInputCol("words").setOutputCol("ngrams") val ngramDataFrame = ngram.transform(wordDataFrame) ngramDataFrame.take(3).map(_.getAsStream[String].toList).foreach(println) {% endhighlight %}
Refer to the NGram Java docs for more details on the API.
{% highlight java %} import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.ml.feature.NGram; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
JavaRDD jrdd = jsc.parallelize(Arrays.asList( RowFactory.create(0.0, Arrays.asList("Hi", "I", "heard", "about", "Spark")), RowFactory.create(1.0, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")), RowFactory.create(2.0, Arrays.asList("Logistic", "regression", "models", "are", "neat")) )); StructType schema = new StructType(new StructField[]{ new StructField("label", DataTypes.DoubleType, false, Metadata.empty()), new StructField("words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty()) }); DataFrame wordDataFrame = sqlContext.createDataFrame(jrdd, schema); NGram ngramTransformer = new NGram().setInputCol("words").setOutputCol("ngrams"); DataFrame ngramDataFrame = ngramTransformer.transform(wordDataFrame); for (Row r : ngramDataFrame.select("ngrams", "label").take(3)) { java.util.List ngrams = r.getList(0); for (String ngram : ngrams) System.out.print(ngram + " --- "); System.out.println(); } {% endhighlight %}
Refer to the NGram Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import NGram
wordDataFrame = sqlContext.createDataFrame([ (0, ["Hi", "I", "heard", "about", "Spark"]), (1, ["I", "wish", "Java", "could", "use", "case", "classes"]), (2, ["Logistic", "regression", "models", "are", "neat"]) ], ["label", "words"]) ngram = NGram(inputCol="words", outputCol="ngrams") ngramDataFrame = ngram.transform(wordDataFrame) for ngrams_label in ngramDataFrame.select("ngrams", "label").take(3): print(ngrams_label) {% endhighlight %}
Binarizer
Binarization is the process of thresholding numerical features to binary (0/1) features.
Binarizer
takes the common parameters inputCol
and outputCol
, as well as the threshold
for binarization. Feature values greater than the threshold are binarized to 1.0; values equal to or less than the threshold are binarized to 0.0.
Refer to the Binarizer Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.Binarizer import org.apache.spark.sql.DataFrame
val data = Array( (0, 0.1), (1, 0.8), (2, 0.2) ) val dataFrame: DataFrame = sqlContext.createDataFrame(data).toDF("label", "feature")
val binarizer: Binarizer = new Binarizer() .setInputCol("feature") .setOutputCol("binarized_feature") .setThreshold(0.5)
val binarizedDataFrame = binarizer.transform(dataFrame) val binarizedFeatures = binarizedDataFrame.select("binarized_feature") binarizedFeatures.collect().foreach(println) {% endhighlight %}
Refer to the Binarizer Java docs for more details on the API.
{% highlight java %} import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.ml.feature.Binarizer; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
JavaRDD jrdd = jsc.parallelize(Arrays.asList( RowFactory.create(0, 0.1), RowFactory.create(1, 0.8), RowFactory.create(2, 0.2) )); StructType schema = new StructType(new StructField[]{ new StructField("label", DataTypes.DoubleType, false, Metadata.empty()), new StructField("feature", DataTypes.DoubleType, false, Metadata.empty()) }); DataFrame continuousDataFrame = jsql.createDataFrame(jrdd, schema); Binarizer binarizer = new Binarizer() .setInputCol("feature") .setOutputCol("binarized_feature") .setThreshold(0.5); DataFrame binarizedDataFrame = binarizer.transform(continuousDataFrame); DataFrame binarizedFeatures = binarizedDataFrame.select("binarized_feature"); for (Row r : binarizedFeatures.collect()) { Double binarized_value = r.getDouble(0); System.out.println(binarized_value); } {% endhighlight %}
Refer to the Binarizer Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import Binarizer
continuousDataFrame = sqlContext.createDataFrame([ (0, 0.1), (1, 0.8), (2, 0.2) ], ["label", "feature"]) binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature") binarizedDataFrame = binarizer.transform(continuousDataFrame) binarizedFeatures = binarizedDataFrame.select("binarized_feature") for binarized_feature, in binarizedFeatures.collect(): print(binarized_feature) {% endhighlight %}
PCA
PCA is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables into a set of values of linearly uncorrelated variables called principal components. A PCA class trains a model to project vectors to a low-dimensional space using PCA. The example below shows how to project 5-dimensional feature vectors into 3-dimensional principal components.
Refer to the PCA Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.PCA import org.apache.spark.mllib.linalg.Vectors
val data = Array( Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))), Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0), Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0) ) val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features") val pca = new PCA() .setInputCol("features") .setOutputCol("pcaFeatures") .setK(3) .fit(df) val pcaDF = pca.transform(df) val result = pcaDF.select("pcaFeatures") result.show() {% endhighlight %}
Refer to the PCA Java docs for more details on the API.
{% highlight java %} import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.ml.feature.PCA import org.apache.spark.ml.feature.PCAModel import org.apache.spark.mllib.linalg.VectorUDT; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
JavaSparkContext jsc = ... SQLContext jsql = ... JavaRDD data = jsc.parallelize(Arrays.asList( RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})), RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)), RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)) )); StructType schema = new StructType(new StructField[] { new StructField("features", new VectorUDT(), false, Metadata.empty()), }); DataFrame df = jsql.createDataFrame(data, schema); PCAModel pca = new PCA() .setInputCol("features") .setOutputCol("pcaFeatures") .setK(3) .fit(df); DataFrame result = pca.transform(df).select("pcaFeatures"); result.show(); {% endhighlight %}
Refer to the PCA Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import PCA from pyspark.mllib.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)] df = sqlContext.createDataFrame(data,["features"]) pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures") model = pca.fit(df) result = model.transform(df).select("pcaFeatures") result.show(truncate=False) {% endhighlight %}
PolynomialExpansion
Polynomial expansion is the process of expanding your features into a polynomial space, which is formulated by an n-degree combination of original dimensions. A PolynomialExpansion class provides this functionality. The example below shows how to expand your features into a 3-degree polynomial space.
Refer to the PolynomialExpansion Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.PolynomialExpansion import org.apache.spark.mllib.linalg.Vectors
val data = Array( Vectors.dense(-2.0, 2.3), Vectors.dense(0.0, 0.0), Vectors.dense(0.6, -1.1) ) val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features") val polynomialExpansion = new PolynomialExpansion() .setInputCol("features") .setOutputCol("polyFeatures") .setDegree(3) val polyDF = polynomialExpansion.transform(df) polyDF.select("polyFeatures").take(3).foreach(println) {% endhighlight %}
Refer to the PolynomialExpansion Java docs for more details on the API.
{% highlight java %} import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.VectorUDT; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
JavaSparkContext jsc = ... SQLContext jsql = ... PolynomialExpansion polyExpansion = new PolynomialExpansion() .setInputCol("features") .setOutputCol("polyFeatures") .setDegree(3); JavaRDD data = jsc.parallelize(Arrays.asList( RowFactory.create(Vectors.dense(-2.0, 2.3)), RowFactory.create(Vectors.dense(0.0, 0.0)), RowFactory.create(Vectors.dense(0.6, -1.1)) )); StructType schema = new StructType(new StructField[] { new StructField("features", new VectorUDT(), false, Metadata.empty()), }); DataFrame df = jsql.createDataFrame(data, schema); DataFrame polyDF = polyExpansion.transform(df); Row[] row = polyDF.select("polyFeatures").take(3); for (Row r : row) { System.out.println(r.get(0)); } {% endhighlight %}
Refer to the PolynomialExpansion Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import PolynomialExpansion from pyspark.mllib.linalg import Vectors
df = sqlContext.createDataFrame( [(Vectors.dense([-2.0, 2.3]), ), (Vectors.dense([0.0, 0.0]), ), (Vectors.dense([0.6, -1.1]), )], ["features"]) px = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures") polyDF = px.transform(df) for expanded in polyDF.select("polyFeatures").take(3): print(expanded) {% endhighlight %}
Discrete Cosine Transform (DCT)
The Discrete Cosine Transform transforms a length N real-valued sequence in the time domain into another length N real-valued sequence in the frequency domain. A DCT class provides this functionality, implementing the DCT-II and scaling the result by 1/\sqrt{2} such that the representing matrix for the transform is unitary. No shift is applied to the transformed sequence (e.g. the 0th element of the transformed sequence is the 0th DCT coefficient and not the N/2th).
Refer to the DCT Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.DCT import org.apache.spark.mllib.linalg.Vectors
val data = Seq( Vectors.dense(0.0, 1.0, -2.0, 3.0), Vectors.dense(-1.0, 2.0, 4.0, -7.0), Vectors.dense(14.0, -2.0, -5.0, 1.0)) val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features") val dct = new DCT() .setInputCol("features") .setOutputCol("featuresDCT") .setInverse(false) val dctDf = dct.transform(df) dctDf.select("featuresDCT").show(3) {% endhighlight %}
Refer to the DCT Java docs for more details on the API.
{% highlight java %} import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.ml.feature.DCT; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.VectorUDT; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
JavaRDD data = jsc.parallelize(Arrays.asList( RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)), RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)), RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0)) )); StructType schema = new StructType(new StructField[] { new StructField("features", new VectorUDT(), false, Metadata.empty()), }); DataFrame df = jsql.createDataFrame(data, schema); DCT dct = new DCT() .setInputCol("features") .setOutputCol("featuresDCT") .setInverse(false); DataFrame dctDf = dct.transform(df); dctDf.select("featuresDCT").show(3); {% endhighlight %}
StringIndexer
StringIndexer
encodes a string column of labels to a column of label indices.
The indices are in [0, numLabels)
, ordered by label frequencies.
So the most frequent label gets index 0
.
If the input column is numeric, we cast it to string and index the string
values. When downstream pipeline components such as Estimator
or
Transformer
make use of this string-indexed label, you must set the input
column of the component to this string-indexed column name. In many cases,
you can set the input column with setInputCol
.
Examples
Assume that we have the following DataFrame with columns id
and category
:
id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c
category
is a string column with three labels: "a", "b", and "c".
Applying StringIndexer
with category
as the input column and categoryIndex
as the output
column, we should get the following:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
"a" gets index 0
because it is the most frequent, followed by "c" with index 1
and "b" with
index 2
.
Refer to the StringIndexer Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.StringIndexer
val df = sqlContext.createDataFrame( Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")) ).toDF("id", "category") val indexer = new StringIndexer() .setInputCol("category") .setOutputCol("categoryIndex") val indexed = indexer.fit(df).transform(df) indexed.show() {% endhighlight %}
Refer to the StringIndexer Java docs for more details on the API.
{% highlight java %} import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.ml.feature.StringIndexer; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import static org.apache.spark.sql.types.DataTypes.*;
JavaRDD jrdd = jsc.parallelize(Arrays.asList( RowFactory.create(0, "a"), RowFactory.create(1, "b"), RowFactory.create(2, "c"), RowFactory.create(3, "a"), RowFactory.create(4, "a"), RowFactory.create(5, "c") )); StructType schema = new StructType(new StructField[] { createStructField("id", DoubleType, false), createStructField("category", StringType, false) }); DataFrame df = sqlContext.createDataFrame(jrdd, schema); StringIndexer indexer = new StringIndexer() .setInputCol("category") .setOutputCol("categoryIndex"); DataFrame indexed = indexer.fit(df).transform(df); indexed.show(); {% endhighlight %}
Refer to the StringIndexer Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import StringIndexer
df = sqlContext.createDataFrame( [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")], ["id", "category"]) indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") indexed = indexer.fit(df).transform(df) indexed.show() {% endhighlight %}
OneHotEncoder
One-hot encoding maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features
Refer to the OneHotEncoder Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
val df = sqlContext.createDataFrame(Seq( (0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c") )).toDF("id", "category")
val indexer = new StringIndexer() .setInputCol("category") .setOutputCol("categoryIndex") .fit(df) val indexed = indexer.transform(df)
val encoder = new OneHotEncoder().setInputCol("categoryIndex"). setOutputCol("categoryVec") val encoded = encoder.transform(indexed) encoded.select("id", "categoryVec").foreach(println) {% endhighlight %}
Refer to the OneHotEncoder Java docs for more details on the API.
{% highlight java %} import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.ml.feature.OneHotEncoder; import org.apache.spark.ml.feature.StringIndexer; import org.apache.spark.ml.feature.StringIndexerModel; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
JavaRDD jrdd = jsc.parallelize(Arrays.asList( RowFactory.create(0, "a"), RowFactory.create(1, "b"), RowFactory.create(2, "c"), RowFactory.create(3, "a"), RowFactory.create(4, "a"), RowFactory.create(5, "c") )); StructType schema = new StructType(new StructField[]{ new StructField("id", DataTypes.DoubleType, false, Metadata.empty()), new StructField("category", DataTypes.StringType, false, Metadata.empty()) }); DataFrame df = sqlContext.createDataFrame(jrdd, schema); StringIndexerModel indexer = new StringIndexer() .setInputCol("category") .setOutputCol("categoryIndex") .fit(df); DataFrame indexed = indexer.transform(df);
OneHotEncoder encoder = new OneHotEncoder() .setInputCol("categoryIndex") .setOutputCol("categoryVec"); DataFrame encoded = encoder.transform(indexed); {% endhighlight %}
Refer to the OneHotEncoder Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import OneHotEncoder, StringIndexer
df = sqlContext.createDataFrame([ (0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c") ], ["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(includeFirst=False, inputCol="categoryIndex", outputCol="categoryVec") encoded = encoder.transform(indexed) {% endhighlight %}
VectorIndexer
VectorIndexer
helps index categorical features in datasets of Vector
s.
It can both automatically decide which features are categorical and convert original values to category indices. Specifically, it does the following:
- Take an input column of type Vector and a parameter
maxCategories
. - Decide which features should be categorical based on the number of distinct values, where features with at most
maxCategories
are declared categorical. - Compute 0-based category indices for each categorical feature.
- Index categorical features and transform original feature values to indices.
Indexing categorical features allows algorithms such as Decision Trees and Tree Ensembles to treat categorical features appropriately, improving performance.
In the example below, we read in a dataset of labeled points and then use VectorIndexer
to decide which features should be treated as categorical. We transform the categorical feature values to their indices. This transformed data could then be passed to algorithms such as DecisionTreeRegressor
that handle categorical features.
Refer to the VectorIndexer Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.VectorIndexer
val data = sqlContext.read.format("libsvm") .load("data/mllib/sample_libsvm_data.txt") val indexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexed") .setMaxCategories(10) val indexerModel = indexer.fit(data) val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet println(s"Chose ${categoricalFeatures.size} categorical features: " + categoricalFeatures.mkString(", "))
// Create new column "indexed" with categorical values transformed to indices val indexedData = indexerModel.transform(data) {% endhighlight %}
Refer to the VectorIndexer Java docs for more details on the API.
{% highlight java %} import java.util.Map;
import org.apache.spark.ml.feature.VectorIndexer; import org.apache.spark.ml.feature.VectorIndexerModel; import org.apache.spark.sql.DataFrame;
DataFrame data = sqlContext.read().format("libsvm") .load("data/mllib/sample_libsvm_data.txt"); VectorIndexer indexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexed") .setMaxCategories(10); VectorIndexerModel indexerModel = indexer.fit(data); Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps(); System.out.print("Chose " + categoryMaps.size() + "categorical features:"); for (Integer feature : categoryMaps.keySet()) { System.out.print(" " + feature); } System.out.println();
// Create new column "indexed" with categorical values transformed to indices DataFrame indexedData = indexerModel.transform(data); {% endhighlight %}
Refer to the VectorIndexer Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import VectorIndexer
data = sqlContext.read.format("libsvm") .load("data/mllib/sample_libsvm_data.txt") indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10) indexerModel = indexer.fit(data)
Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data) {% endhighlight %}
Normalizer
Normalizer
is a Transformer
which transforms a dataset of Vector
rows, normalizing each Vector
to have unit norm. It takes parameter p
, which specifies the p-norm used for normalization. (p = 2 by default.) This normalization can help standardize your input data and improve the behavior of learning algorithms.
The following example demonstrates how to load a dataset in libsvm format and then normalize each row to have unit L^2 norm and unit L^\infty norm.
Refer to the Normalizer Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.Normalizer
val dataFrame = sqlContext.read.format("libsvm") .load("data/mllib/sample_libsvm_data.txt")
// Normalize each Vector using L^1 norm. val normalizer = new Normalizer() .setInputCol("features") .setOutputCol("normFeatures") .setP(1.0) val l1NormData = normalizer.transform(dataFrame)
// Normalize each Vector using L^\infty norm. val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity) {% endhighlight %}
Refer to the Normalizer Java docs for more details on the API.
{% highlight java %} import org.apache.spark.ml.feature.Normalizer; import org.apache.spark.sql.DataFrame;
DataFrame dataFrame = sqlContext.read().format("libsvm") .load("data/mllib/sample_libsvm_data.txt");
// Normalize each Vector using L^1 norm. Normalizer normalizer = new Normalizer() .setInputCol("features") .setOutputCol("normFeatures") .setP(1.0); DataFrame l1NormData = normalizer.transform(dataFrame);
// Normalize each Vector using L^\infty norm. DataFrame lInfNormData = normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY)); {% endhighlight %}
Refer to the Normalizer Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import Normalizer
dataFrame = sqlContext.read.format("libsvm") .load("data/mllib/sample_libsvm_data.txt")
L^1 norm.
Normalize each Vector usingnormalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0) l1NormData = normalizer.transform(dataFrame)
L^\infty norm.
Normalize each Vector usinglInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")}) {% endhighlight %}
StandardScaler
StandardScaler
transforms a dataset of Vector
rows, normalizing each feature to have unit standard deviation and/or zero mean. It takes parameters:
-
withStd
: True by default. Scales the data to unit standard deviation. -
withMean
: False by default. Centers the data with mean before scaling. It will build a dense output, so this does not work on sparse input and will raise an exception.
StandardScaler
is an Estimator
which can be fit
on a dataset to produce a StandardScalerModel
; this amounts to computing summary statistics. The model can then transform a Vector
column in a dataset to have unit standard deviation and/or zero mean features.
Note that if the standard deviation of a feature is zero, it will return default 0.0
value in the Vector
for that feature.
The following example demonstrates how to load a dataset in libsvm format and then normalize each feature to have unit standard deviation.
Refer to the StandardScaler Scala docs for more details on the API.
{% highlight scala %} import org.apache.spark.ml.feature.StandardScaler
val dataFrame = sqlContext.read.format("libsvm") .load("data/mllib/sample_libsvm_data.txt") val scaler = new StandardScaler() .setInputCol("features") .setOutputCol("scaledFeatures") .setWithStd(true) .setWithMean(false)
// Compute summary statistics by fitting the StandardScaler val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation. val scaledData = scalerModel.transform(dataFrame) {% endhighlight %}
Refer to the StandardScaler Java docs for more details on the API.
{% highlight java %} import org.apache.spark.ml.feature.StandardScaler; import org.apache.spark.ml.feature.StandardScalerModel; import org.apache.spark.sql.DataFrame;
DataFrame dataFrame = sqlContext.read().format("libsvm") .load("data/mllib/sample_libsvm_data.txt"); StandardScaler scaler = new StandardScaler() .setInputCol("features") .setOutputCol("scaledFeatures") .setWithStd(true) .setWithMean(false);
// Compute summary statistics by fitting the StandardScaler StandardScalerModel scalerModel = scaler.fit(dataFrame);
// Normalize each feature to have unit standard deviation. DataFrame scaledData = scalerModel.transform(dataFrame); {% endhighlight %}
Refer to the StandardScaler Python docs for more details on the API.
{% highlight python %} from pyspark.ml.feature import StandardScaler
dataFrame = sqlContext.read.format("libsvm") .load("data/mllib/sample_libsvm_data.txt") scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)
Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame) {% endhighlight %}
MinMaxScaler
MinMaxScaler
transforms a dataset of Vector
rows, rescaling each feature to a specific range (often [0, 1]). It takes parameters: