From 08c1972a0661d42f300520cc6e5fb31023de093b Mon Sep 17 00:00:00 2001 From: Yun Ni <yunn@uber.com> Date: Wed, 15 Feb 2017 16:26:05 -0800 Subject: [PATCH] [SPARK-18080][ML][PYTHON] Python API & Examples for Locality Sensitive Hashing ## What changes were proposed in this pull request? This pull request includes python API and examples for LSH. The API changes was based on yanboliang 's PR #15768 and resolved conflicts and API changes on the Scala API. The examples are consistent with Scala examples of MinHashLSH and BucketedRandomProjectionLSH. ## How was this patch tested? API and examples are tested using spark-submit: `bin/spark-submit examples/src/main/python/ml/min_hash_lsh.py` `bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh.py` User guide changes are generated and manually inspected: `SKIP_API=1 jekyll build` Author: Yun Ni <yunn@uber.com> Author: Yanbo Liang <ybliang8@gmail.com> Author: Yunni <Euler57721@gmail.com> Closes #16715 from Yunni/spark-18080. --- docs/ml-features.md | 17 + ...avaBucketedRandomProjectionLSHExample.java | 38 ++- .../examples/ml/JavaMinHashLSHExample.java | 57 +++- .../bucketed_random_projection_lsh_example.py | 81 +++++ .../main/python/ml/min_hash_lsh_example.py | 81 +++++ .../BucketedRandomProjectionLSHExample.scala | 39 ++- .../spark/examples/ml/MinHashLSHExample.scala | 43 ++- .../org/apache/spark/ml/feature/LSH.scala | 7 +- python/pyspark/ml/feature.py | 291 ++++++++++++++++++ 9 files changed, 601 insertions(+), 53 deletions(-) create mode 100644 examples/src/main/python/ml/bucketed_random_projection_lsh_example.py create mode 100644 examples/src/main/python/ml/min_hash_lsh_example.py diff --git a/docs/ml-features.md b/docs/ml-features.md index 13d97a2290..57605bafbf 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -1558,6 +1558,15 @@ for more details on the API. {% include_example java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java %} </div> + +<div data-lang="python" markdown="1"> + +Refer to the [BucketedRandomProjectionLSH Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.BucketedRandomProjectionLSH) +for more details on the API. + +{% include_example python/ml/bucketed_random_projection_lsh_example.py %} +</div> + </div> ### MinHash for Jaccard Distance @@ -1590,4 +1599,12 @@ for more details on the API. {% include_example java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java %} </div> + +<div data-lang="python" markdown="1"> + +Refer to the [MinHashLSH Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.MinHashLSH) +for more details on the API. + +{% include_example python/ml/min_hash_lsh_example.py %} +</div> </div> diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java index ca3ee5a285..4594e3462b 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java @@ -35,8 +35,15 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.functions.col; // $example off$ +/** + * An example demonstrating BucketedRandomProjectionLSH. + * Run with: + * bin/run-example org.apache.spark.examples.ml.JavaBucketedRandomProjectionLSHExample + */ public class JavaBucketedRandomProjectionLSHExample { public static void main(String[] args) { SparkSession spark = SparkSession @@ -61,7 +68,7 @@ public class JavaBucketedRandomProjectionLSHExample { StructType schema = new StructType(new StructField[]{ new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), - new StructField("keys", new VectorUDT(), false, Metadata.empty()) + new StructField("features", new VectorUDT(), false, Metadata.empty()) }); Dataset<Row> dfA = spark.createDataFrame(dataA, schema); Dataset<Row> dfB = spark.createDataFrame(dataB, schema); @@ -71,26 +78,31 @@ public class JavaBucketedRandomProjectionLSHExample { BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH() .setBucketLength(2.0) .setNumHashTables(3) - .setInputCol("keys") - .setOutputCol("values"); + .setInputCol("features") + .setOutputCol("hashes"); BucketedRandomProjectionLSHModel model = mh.fit(dfA); // Feature Transformation + System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':"); model.transform(dfA).show(); - // Cache the transformed columns - Dataset<Row> transformedA = model.transform(dfA).cache(); - Dataset<Row> transformedB = model.transform(dfB).cache(); - // Approximate similarity join - model.approxSimilarityJoin(dfA, dfB, 1.5).show(); - model.approxSimilarityJoin(transformedA, transformedB, 1.5).show(); - // Self Join - model.approxSimilarityJoin(dfA, dfA, 2.5).filter("datasetA.id < datasetB.id").show(); + // Compute the locality sensitive hashes for the input rows, then perform approximate + // similarity join. + // We could avoid computing hashes by passing in the already-transformed dataset, e.g. + // `model.approxSimilarityJoin(transformedA, transformedB, 1.5)` + System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:"); + model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance") + .select(col("datasetA.id").alias("idA"), + col("datasetB.id").alias("idB"), + col("EuclideanDistance")).show(); - // Approximate nearest neighbor search + // Compute the locality sensitive hashes for the input rows, then perform approximate nearest + // neighbor search. + // We could avoid computing hashes by passing in the already-transformed dataset, e.g. + // `model.approxNearestNeighbors(transformedA, key, 2)` + System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:"); model.approxNearestNeighbors(dfA, key, 2).show(); - model.approxNearestNeighbors(transformedA, key, 2).show(); // $example off$ spark.stop(); diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java index 9dbbf6d117..0aace46939 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.spark.ml.feature.MinHashLSH; import org.apache.spark.ml.feature.MinHashLSHModel; +import org.apache.spark.ml.linalg.Vector; import org.apache.spark.ml.linalg.VectorUDT; import org.apache.spark.ml.linalg.Vectors; import org.apache.spark.sql.Dataset; @@ -34,8 +35,15 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.functions.col; // $example off$ +/** + * An example demonstrating MinHashLSH. + * Run with: + * bin/run-example org.apache.spark.examples.ml.JavaMinHashLSHExample + */ public class JavaMinHashLSHExample { public static void main(String[] args) { SparkSession spark = SparkSession @@ -44,25 +52,58 @@ public class JavaMinHashLSHExample { .getOrCreate(); // $example on$ - List<Row> data = Arrays.asList( + List<Row> dataA = Arrays.asList( RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})), RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})), RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0})) ); + List<Row> dataB = Arrays.asList( + RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})), + RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})), + RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0})) + ); + StructType schema = new StructType(new StructField[]{ new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), - new StructField("keys", new VectorUDT(), false, Metadata.empty()) + new StructField("features", new VectorUDT(), false, Metadata.empty()) }); - Dataset<Row> dataFrame = spark.createDataFrame(data, schema); + Dataset<Row> dfA = spark.createDataFrame(dataA, schema); + Dataset<Row> dfB = spark.createDataFrame(dataB, schema); + + int[] indices = {1, 3}; + double[] values = {1.0, 1.0}; + Vector key = Vectors.sparse(6, indices, values); MinHashLSH mh = new MinHashLSH() - .setNumHashTables(1) - .setInputCol("keys") - .setOutputCol("values"); + .setNumHashTables(5) + .setInputCol("features") + .setOutputCol("hashes"); + + MinHashLSHModel model = mh.fit(dfA); + + // Feature Transformation + System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':"); + model.transform(dfA).show(); + + // Compute the locality sensitive hashes for the input rows, then perform approximate + // similarity join. + // We could avoid computing hashes by passing in the already-transformed dataset, e.g. + // `model.approxSimilarityJoin(transformedA, transformedB, 0.6)` + System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:"); + model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance") + .select(col("datasetA.id").alias("idA"), + col("datasetB.id").alias("idB"), + col("JaccardDistance")).show(); - MinHashLSHModel model = mh.fit(dataFrame); - model.transform(dataFrame).show(); + // Compute the locality sensitive hashes for the input rows, then perform approximate nearest + // neighbor search. + // We could avoid computing hashes by passing in the already-transformed dataset, e.g. + // `model.approxNearestNeighbors(transformedA, key, 2)` + // It may return less than 2 rows when not enough approximate near-neighbor candidates are + // found. + System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:"); + model.approxNearestNeighbors(dfA, key, 2).show(); // $example off$ spark.stop(); diff --git a/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py b/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py new file mode 100644 index 0000000000..1b7a458125 --- /dev/null +++ b/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +from __future__ import print_function + +# $example on$ +from pyspark.ml.feature import BucketedRandomProjectionLSH +from pyspark.ml.linalg import Vectors +from pyspark.sql.functions import col +# $example off$ +from pyspark.sql import SparkSession + +""" +An example demonstrating BucketedRandomProjectionLSH. +Run with: + bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh_example.py +""" + +if __name__ == "__main__": + spark = SparkSession \ + .builder \ + .appName("BucketedRandomProjectionLSHExample") \ + .getOrCreate() + + # $example on$ + dataA = [(0, Vectors.dense([1.0, 1.0]),), + (1, Vectors.dense([1.0, -1.0]),), + (2, Vectors.dense([-1.0, -1.0]),), + (3, Vectors.dense([-1.0, 1.0]),)] + dfA = spark.createDataFrame(dataA, ["id", "features"]) + + dataB = [(4, Vectors.dense([1.0, 0.0]),), + (5, Vectors.dense([-1.0, 0.0]),), + (6, Vectors.dense([0.0, 1.0]),), + (7, Vectors.dense([0.0, -1.0]),)] + dfB = spark.createDataFrame(dataB, ["id", "features"]) + + key = Vectors.dense([1.0, 0.0]) + + brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0, + numHashTables=3) + model = brp.fit(dfA) + + # Feature Transformation + print("The hashed dataset where hashed values are stored in the column 'hashes':") + model.transform(dfA).show() + + # Compute the locality sensitive hashes for the input rows, then perform approximate + # similarity join. + # We could avoid computing hashes by passing in the already-transformed dataset, e.g. + # `model.approxSimilarityJoin(transformedA, transformedB, 1.5)` + print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:") + model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\ + .select(col("datasetA.id").alias("idA"), + col("datasetB.id").alias("idB"), + col("EuclideanDistance")).show() + + # Compute the locality sensitive hashes for the input rows, then perform approximate nearest + # neighbor search. + # We could avoid computing hashes by passing in the already-transformed dataset, e.g. + # `model.approxNearestNeighbors(transformedA, key, 2)` + print("Approximately searching dfA for 2 nearest neighbors of the key:") + model.approxNearestNeighbors(dfA, key, 2).show() + # $example off$ + + spark.stop() diff --git a/examples/src/main/python/ml/min_hash_lsh_example.py b/examples/src/main/python/ml/min_hash_lsh_example.py new file mode 100644 index 0000000000..7b1dd611a8 --- /dev/null +++ b/examples/src/main/python/ml/min_hash_lsh_example.py @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +from __future__ import print_function + +# $example on$ +from pyspark.ml.feature import MinHashLSH +from pyspark.ml.linalg import Vectors +from pyspark.sql.functions import col +# $example off$ +from pyspark.sql import SparkSession + +""" +An example demonstrating MinHashLSH. +Run with: + bin/spark-submit examples/src/main/python/ml/min_hash_lsh_example.py +""" + +if __name__ == "__main__": + spark = SparkSession \ + .builder \ + .appName("MinHashLSHExample") \ + .getOrCreate() + + # $example on$ + dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),), + (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),), + (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)] + dfA = spark.createDataFrame(dataA, ["id", "features"]) + + dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),), + (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),), + (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)] + dfB = spark.createDataFrame(dataB, ["id", "features"]) + + key = Vectors.sparse(6, [1, 3], [1.0, 1.0]) + + mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5) + model = mh.fit(dfA) + + # Feature Transformation + print("The hashed dataset where hashed values are stored in the column 'hashes':") + model.transform(dfA).show() + + # Compute the locality sensitive hashes for the input rows, then perform approximate + # similarity join. + # We could avoid computing hashes by passing in the already-transformed dataset, e.g. + # `model.approxSimilarityJoin(transformedA, transformedB, 0.6)` + print("Approximately joining dfA and dfB on distance smaller than 0.6:") + model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\ + .select(col("datasetA.id").alias("idA"), + col("datasetB.id").alias("idB"), + col("JaccardDistance")).show() + + # Compute the locality sensitive hashes for the input rows, then perform approximate nearest + # neighbor search. + # We could avoid computing hashes by passing in the already-transformed dataset, e.g. + # `model.approxNearestNeighbors(transformedA, key, 2)` + # It may return less than 2 rows when not enough approximate near-neighbor candidates are + # found. + print("Approximately searching dfA for 2 nearest neighbors of the key:") + model.approxNearestNeighbors(dfA, key, 2).show() + + # $example off$ + + spark.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala index 686cc39d3b..654535c264 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala @@ -21,9 +21,15 @@ package org.apache.spark.examples.ml // $example on$ import org.apache.spark.ml.feature.BucketedRandomProjectionLSH import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.sql.functions.col // $example off$ import org.apache.spark.sql.SparkSession +/** + * An example demonstrating BucketedRandomProjectionLSH. + * Run with: + * bin/run-example org.apache.spark.examples.ml.BucketedRandomProjectionLSHExample + */ object BucketedRandomProjectionLSHExample { def main(args: Array[String]): Unit = { // Creates a SparkSession @@ -38,40 +44,45 @@ object BucketedRandomProjectionLSHExample { (1, Vectors.dense(1.0, -1.0)), (2, Vectors.dense(-1.0, -1.0)), (3, Vectors.dense(-1.0, 1.0)) - )).toDF("id", "keys") + )).toDF("id", "features") val dfB = spark.createDataFrame(Seq( (4, Vectors.dense(1.0, 0.0)), (5, Vectors.dense(-1.0, 0.0)), (6, Vectors.dense(0.0, 1.0)), (7, Vectors.dense(0.0, -1.0)) - )).toDF("id", "keys") + )).toDF("id", "features") val key = Vectors.dense(1.0, 0.0) val brp = new BucketedRandomProjectionLSH() .setBucketLength(2.0) .setNumHashTables(3) - .setInputCol("keys") - .setOutputCol("values") + .setInputCol("features") + .setOutputCol("hashes") val model = brp.fit(dfA) // Feature Transformation + println("The hashed dataset where hashed values are stored in the column 'hashes':") model.transform(dfA).show() - // Cache the transformed columns - val transformedA = model.transform(dfA).cache() - val transformedB = model.transform(dfB).cache() - // Approximate similarity join - model.approxSimilarityJoin(dfA, dfB, 1.5).show() - model.approxSimilarityJoin(transformedA, transformedB, 1.5).show() - // Self Join - model.approxSimilarityJoin(dfA, dfA, 2.5).filter("datasetA.id < datasetB.id").show() + // Compute the locality sensitive hashes for the input rows, then perform approximate + // similarity join. + // We could avoid computing hashes by passing in the already-transformed dataset, e.g. + // `model.approxSimilarityJoin(transformedA, transformedB, 1.5)` + println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:") + model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance") + .select(col("datasetA.id").alias("idA"), + col("datasetB.id").alias("idB"), + col("EuclideanDistance")).show() - // Approximate nearest neighbor search + // Compute the locality sensitive hashes for the input rows, then perform approximate nearest + // neighbor search. + // We could avoid computing hashes by passing in the already-transformed dataset, e.g. + // `model.approxNearestNeighbors(transformedA, key, 2)` + println("Approximately searching dfA for 2 nearest neighbors of the key:") model.approxNearestNeighbors(dfA, key, 2).show() - model.approxNearestNeighbors(transformedA, key, 2).show() // $example off$ spark.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala index f4fc3cf411..6c1e22268a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala @@ -21,9 +21,15 @@ package org.apache.spark.examples.ml // $example on$ import org.apache.spark.ml.feature.MinHashLSH import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.sql.functions.col // $example off$ import org.apache.spark.sql.SparkSession +/** + * An example demonstrating MinHashLSH. + * Run with: + * bin/run-example org.apache.spark.examples.ml.MinHashLSHExample + */ object MinHashLSHExample { def main(args: Array[String]): Unit = { // Creates a SparkSession @@ -37,38 +43,45 @@ object MinHashLSHExample { (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))), (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))), (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0)))) - )).toDF("id", "keys") + )).toDF("id", "features") val dfB = spark.createDataFrame(Seq( (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))), (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))), (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))) - )).toDF("id", "keys") + )).toDF("id", "features") val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0))) val mh = new MinHashLSH() - .setNumHashTables(3) - .setInputCol("keys") - .setOutputCol("values") + .setNumHashTables(5) + .setInputCol("features") + .setOutputCol("hashes") val model = mh.fit(dfA) // Feature Transformation + println("The hashed dataset where hashed values are stored in the column 'hashes':") model.transform(dfA).show() - // Cache the transformed columns - val transformedA = model.transform(dfA).cache() - val transformedB = model.transform(dfB).cache() - // Approximate similarity join - model.approxSimilarityJoin(dfA, dfB, 0.6).show() - model.approxSimilarityJoin(transformedA, transformedB, 0.6).show() - // Self Join - model.approxSimilarityJoin(dfA, dfA, 0.6).filter("datasetA.id < datasetB.id").show() + // Compute the locality sensitive hashes for the input rows, then perform approximate + // similarity join. + // We could avoid computing hashes by passing in the already-transformed dataset, e.g. + // `model.approxSimilarityJoin(transformedA, transformedB, 0.6)` + println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:") + model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance") + .select(col("datasetA.id").alias("idA"), + col("datasetB.id").alias("idB"), + col("JaccardDistance")).show() - // Approximate nearest neighbor search + // Compute the locality sensitive hashes for the input rows, then perform approximate nearest + // neighbor search. + // We could avoid computing hashes by passing in the already-transformed dataset, e.g. + // `model.approxNearestNeighbors(transformedA, key, 2)` + // It may return less than 2 rows when not enough approximate near-neighbor candidates are + // found. + println("Approximately searching dfA for 2 nearest neighbors of the key:") model.approxNearestNeighbors(dfA, key, 2).show() - model.approxNearestNeighbors(transformedA, key, 2).show() // $example off$ spark.stop() diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 309cc2ef52..1c9f47a0b2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -222,7 +222,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] } /** - * Join two dataset to approximately find all pairs of rows whose distance are smaller than + * Join two datasets to approximately find all pairs of rows whose distance are smaller than * the threshold. If the [[outputCol]] is missing, the method will transform the data; if the * [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the transformed * data when necessary. @@ -230,9 +230,10 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * @param datasetA One of the datasets to join. * @param datasetB Another dataset to join. * @param threshold The threshold for the distance of row pairs. - * @param distCol Output column for storing the distance between each result row and the key. + * @param distCol Output column for storing the distance between each pair of rows. * @return A joined dataset containing pairs of rows. The original rows are in columns - * "datasetA" and "datasetB", and a distCol is added to show the distance of each pair. + * "datasetA" and "datasetB", and a column "distCol" is added to show the distance + * between each pair. */ def approxSimilarityJoin( datasetA: Dataset[_], diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 1ab42919ea..c2eafbefcd 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -28,6 +28,7 @@ from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaTransformer, _jvm from pyspark.ml.common import inherit_doc __all__ = ['Binarizer', + 'BucketedRandomProjectionLSH', 'BucketedRandomProjectionLSHModel', 'Bucketizer', 'ChiSqSelector', 'ChiSqSelectorModel', 'CountVectorizer', 'CountVectorizerModel', @@ -37,6 +38,7 @@ __all__ = ['Binarizer', 'IDF', 'IDFModel', 'IndexToString', 'MaxAbsScaler', 'MaxAbsScalerModel', + 'MinHashLSH', 'MinHashLSHModel', 'MinMaxScaler', 'MinMaxScalerModel', 'NGram', 'Normalizer', @@ -120,6 +122,196 @@ class Binarizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, Java return self.getOrDefault(self.threshold) +class LSHParams(Params): + """ + Mixin for Locality Sensitive Hashing (LSH) algorithm parameters. + """ + + numHashTables = Param(Params._dummy(), "numHashTables", "number of hash tables, where " + + "increasing number of hash tables lowers the false negative rate, " + + "and decreasing it improves the running performance.", + typeConverter=TypeConverters.toInt) + + def __init__(self): + super(LSHParams, self).__init__() + + def setNumHashTables(self, value): + """ + Sets the value of :py:attr:`numHashTables`. + """ + return self._set(numHashTables=value) + + def getNumHashTables(self): + """ + Gets the value of numHashTables or its default value. + """ + return self.getOrDefault(self.numHashTables) + + +class LSHModel(JavaModel): + """ + Mixin for Locality Sensitive Hashing (LSH) models. + """ + + def approxNearestNeighbors(self, dataset, key, numNearestNeighbors, distCol="distCol"): + """ + Given a large dataset and an item, approximately find at most k items which have the + closest distance to the item. If the :py:attr:`outputCol` is missing, the method will + transform the data; if the :py:attr:`outputCol` exists, it will use that. This allows + caching of the transformed data when necessary. + + .. note:: This method is experimental and will likely change behavior in the next release. + + :param dataset: The dataset to search for nearest neighbors of the key. + :param key: Feature vector representing the item to search for. + :param numNearestNeighbors: The maximum number of nearest neighbors. + :param distCol: Output column for storing the distance between each result row and the key. + Use "distCol" as default value if it's not specified. + :return: A dataset containing at most k items closest to the key. A column "distCol" is + added to show the distance between each row and the key. + """ + return self._call_java("approxNearestNeighbors", dataset, key, numNearestNeighbors, + distCol) + + def approxSimilarityJoin(self, datasetA, datasetB, threshold, distCol="distCol"): + """ + Join two datasets to approximately find all pairs of rows whose distance are smaller than + the threshold. If the :py:attr:`outputCol` is missing, the method will transform the data; + if the :py:attr:`outputCol` exists, it will use that. This allows caching of the + transformed data when necessary. + + :param datasetA: One of the datasets to join. + :param datasetB: Another dataset to join. + :param threshold: The threshold for the distance of row pairs. + :param distCol: Output column for storing the distance between each pair of rows. Use + "distCol" as default value if it's not specified. + :return: A joined dataset containing pairs of rows. The original rows are in columns + "datasetA" and "datasetB", and a column "distCol" is added to show the distance + between each pair. + """ + return self._call_java("approxSimilarityJoin", datasetA, datasetB, threshold, distCol) + + +@inherit_doc +class BucketedRandomProjectionLSH(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed, + JavaMLReadable, JavaMLWritable): + """ + .. note:: Experimental + + LSH class for Euclidean distance metrics. + The input is dense or sparse vectors, each of which represents a point in the Euclidean + distance space. The output will be vectors of configurable dimension. Hash values in the same + dimension are calculated by the same hash function. + + .. seealso:: `Stable Distributions \ + <https://en.wikipedia.org/wiki/Locality-sensitive_hashing#Stable_distributions>`_ + .. seealso:: `Hashing for Similarity Search: A Survey <https://arxiv.org/abs/1408.2927>`_ + + >>> from pyspark.ml.linalg import Vectors + >>> from pyspark.sql.functions import col + >>> data = [(0, Vectors.dense([-1.0, -1.0 ]),), + ... (1, Vectors.dense([-1.0, 1.0 ]),), + ... (2, Vectors.dense([1.0, -1.0 ]),), + ... (3, Vectors.dense([1.0, 1.0]),)] + >>> df = spark.createDataFrame(data, ["id", "features"]) + >>> brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", + ... seed=12345, bucketLength=1.0) + >>> model = brp.fit(df) + >>> model.transform(df).head() + Row(id=0, features=DenseVector([-1.0, -1.0]), hashes=[DenseVector([-1.0])]) + >>> data2 = [(4, Vectors.dense([2.0, 2.0 ]),), + ... (5, Vectors.dense([2.0, 3.0 ]),), + ... (6, Vectors.dense([3.0, 2.0 ]),), + ... (7, Vectors.dense([3.0, 3.0]),)] + >>> df2 = spark.createDataFrame(data2, ["id", "features"]) + >>> model.approxNearestNeighbors(df2, Vectors.dense([1.0, 2.0]), 1).collect() + [Row(id=4, features=DenseVector([2.0, 2.0]), hashes=[DenseVector([1.0])], distCol=1.0)] + >>> model.approxSimilarityJoin(df, df2, 3.0, distCol="EuclideanDistance").select( + ... col("datasetA.id").alias("idA"), + ... col("datasetB.id").alias("idB"), + ... col("EuclideanDistance")).show() + +---+---+-----------------+ + |idA|idB|EuclideanDistance| + +---+---+-----------------+ + | 3| 6| 2.23606797749979| + +---+---+-----------------+ + ... + >>> brpPath = temp_path + "/brp" + >>> brp.save(brpPath) + >>> brp2 = BucketedRandomProjectionLSH.load(brpPath) + >>> brp2.getBucketLength() == brp.getBucketLength() + True + >>> modelPath = temp_path + "/brp-model" + >>> model.save(modelPath) + >>> model2 = BucketedRandomProjectionLSHModel.load(modelPath) + >>> model.transform(df).head().hashes == model2.transform(df).head().hashes + True + + .. versionadded:: 2.2.0 + """ + + bucketLength = Param(Params._dummy(), "bucketLength", "the length of each hash bucket, " + + "a larger bucket lowers the false negative rate.", + typeConverter=TypeConverters.toFloat) + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, + bucketLength=None): + """ + __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, + bucketLength=None) + """ + super(BucketedRandomProjectionLSH, self).__init__() + self._java_obj = \ + self._new_java_obj("org.apache.spark.ml.feature.BucketedRandomProjectionLSH", self.uid) + self._setDefault(numHashTables=1) + kwargs = self.__init__._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.2.0") + def setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, + bucketLength=None): + """ + setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, \ + bucketLength=None) + Sets params for this BucketedRandomProjectionLSH. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + @since("2.2.0") + def setBucketLength(self, value): + """ + Sets the value of :py:attr:`bucketLength`. + """ + return self._set(bucketLength=value) + + @since("2.2.0") + def getBucketLength(self): + """ + Gets the value of bucketLength or its default value. + """ + return self.getOrDefault(self.bucketLength) + + def _create_model(self, java_model): + return BucketedRandomProjectionLSHModel(java_model) + + +class BucketedRandomProjectionLSHModel(LSHModel, JavaMLReadable, JavaMLWritable): + """ + .. note:: Experimental + + Model fitted by :py:class:`BucketedRandomProjectionLSH`, where multiple random vectors are + stored. The vectors are normalized to be unit vectors and each vector is used in a hash + function: :math:`h_i(x) = floor(r_i \cdot x / bucketLength)` where :math:`r_i` is the + i-th random unit vector. The number of buckets will be `(max L2 norm of input vectors) / + bucketLength`. + + .. versionadded:: 2.2.0 + """ + + @inherit_doc class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable): """ @@ -754,6 +946,105 @@ class MaxAbsScalerModel(JavaModel, JavaMLReadable, JavaMLWritable): return self._call_java("maxAbs") +@inherit_doc +class MinHashLSH(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed, + JavaMLReadable, JavaMLWritable): + + """ + .. note:: Experimental + + LSH class for Jaccard distance. + The input can be dense or sparse vectors, but it is more efficient if it is sparse. + For example, `Vectors.sparse(10, [(2, 1.0), (3, 1.0), (5, 1.0)])` means there are 10 elements + in the space. This set contains elements 2, 3, and 5. Also, any input vector must have at + least 1 non-zero index, and all non-zero values are treated as binary "1" values. + + .. seealso:: `Wikipedia on MinHash <https://en.wikipedia.org/wiki/MinHash>`_ + + >>> from pyspark.ml.linalg import Vectors + >>> from pyspark.sql.functions import col + >>> data = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),), + ... (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),), + ... (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)] + >>> df = spark.createDataFrame(data, ["id", "features"]) + >>> mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345) + >>> model = mh.fit(df) + >>> model.transform(df).head() + Row(id=0, features=SparseVector(6, {0: 1.0, 1: 1.0, 2: 1.0}), hashes=[DenseVector([-1638925... + >>> data2 = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),), + ... (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),), + ... (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)] + >>> df2 = spark.createDataFrame(data2, ["id", "features"]) + >>> key = Vectors.sparse(6, [1, 2], [1.0, 1.0]) + >>> model.approxNearestNeighbors(df2, key, 1).collect() + [Row(id=5, features=SparseVector(6, {1: 1.0, 2: 1.0, 4: 1.0}), hashes=[DenseVector([-163892... + >>> model.approxSimilarityJoin(df, df2, 0.6, distCol="JaccardDistance").select( + ... col("datasetA.id").alias("idA"), + ... col("datasetB.id").alias("idB"), + ... col("JaccardDistance")).show() + +---+---+---------------+ + |idA|idB|JaccardDistance| + +---+---+---------------+ + | 1| 4| 0.5| + | 0| 5| 0.5| + +---+---+---------------+ + ... + >>> mhPath = temp_path + "/mh" + >>> mh.save(mhPath) + >>> mh2 = MinHashLSH.load(mhPath) + >>> mh2.getOutputCol() == mh.getOutputCol() + True + >>> modelPath = temp_path + "/mh-model" + >>> model.save(modelPath) + >>> model2 = MinHashLSHModel.load(modelPath) + >>> model.transform(df).head().hashes == model2.transform(df).head().hashes + True + + .. versionadded:: 2.2.0 + """ + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1): + """ + __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1) + """ + super(MinHashLSH, self).__init__() + self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.MinHashLSH", self.uid) + self._setDefault(numHashTables=1) + kwargs = self.__init__._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.2.0") + def setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1): + """ + setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1) + Sets params for this MinHashLSH. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + def _create_model(self, java_model): + return MinHashLSHModel(java_model) + + +class MinHashLSHModel(LSHModel, JavaMLReadable, JavaMLWritable): + """ + .. note:: Experimental + + Model produced by :py:class:`MinHashLSH`, where where multiple hash functions are stored. Each + hash function is picked from the following family of hash functions, where :math:`a_i` and + :math:`b_i` are randomly chosen integers less than prime: + :math:`h_i(x) = ((x \cdot a_i + b_i) \mod prime)` This hash family is approximately min-wise + independent according to the reference. + + .. seealso:: Tom Bohman, Colin Cooper, and Alan Frieze. "Min-wise independent linear \ + permutations." Electronic Journal of Combinatorics 7 (2000): R26. + + .. versionadded:: 2.2.0 + """ + + @inherit_doc class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable): """ -- GitLab