Skip to content
Snippets Groups Projects
Commit 66924ffa authored by Xiangrui Meng's avatar Xiangrui Meng
Browse files

[SPARK-9527] [MLLIB] add PrefixSpanModel and make PrefixSpan Java friendly

1. Use `PrefixSpanModel` to wrap the frequent sequences.
2. Define `FreqSequence` to wrap each frequent sequence, which contains a Java-friendly method `javaSequence`
3. Overload `run` for Java users.
4. Added a unit test in Java to check Java compatibility.

zhangjiajin feynmanliang

Author: Xiangrui Meng <meng@databricks.com>

Closes #7869 from mengxr/SPARK-9527 and squashes the following commits:

4345594 [Xiangrui Meng] add PrefixSpanModel and make PrefixSpan Java friendly
parent 8eafa2ae
No related branches found
No related tags found
No related merge requests found
......@@ -17,11 +17,16 @@
package org.apache.spark.mllib.fpm
import java.{lang => jl, util => ju}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuilder
import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
......@@ -93,9 +98,9 @@ class PrefixSpan private (
/**
* Find the complete set of sequential patterns in the input sequences of itemsets.
* @param data ordered sequences of itemsets.
* @return (sequential itemset pattern, count) tuples
* @return a [[PrefixSpanModel]] that contains the frequent sequences
*/
def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): RDD[(Array[Array[Item]], Long)] = {
def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): PrefixSpanModel[Item] = {
val itemToInt = data.aggregate(Set[Item]())(
seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet },
combOp = { _ ++ _ }
......@@ -113,9 +118,25 @@ class PrefixSpan private (
case (x, xs) => List(x.map(intToItem).toArray)
}
}
results.map { case (seq: Array[Int], count: Long) =>
(toPublicRepr(seq).toArray, count)
val freqSequences = results.map { case (seq: Array[Int], count: Long) =>
new FreqSequence[Item](toPublicRepr(seq).toArray, count)
}
new PrefixSpanModel[Item](freqSequences)
}
/**
* A Java-friendly version of [[run()]] that reads sequences from a [[JavaRDD]] and returns
* frequent sequences in a [[PrefixSpanModel]].
* @param data ordered sequences of itemsets stored as Java Iterable of Iterables
* @tparam Item item type
* @tparam Itemset itemset type, which is an Iterable of Items
* @tparam Sequence sequence type, which is an Iterable of Itemsets
* @return a [[PrefixSpanModel]] that contains the frequent sequences
*/
def run[Item, Itemset <: jl.Iterable[Item], Sequence <: jl.Iterable[Itemset]](
data: JavaRDD[Sequence]): PrefixSpanModel[Item] = {
implicit val tag = fakeClassTag[Item]
run(data.rdd.map(_.asScala.map(_.asScala.toArray).toArray))
}
/**
......@@ -287,7 +308,7 @@ class PrefixSpan private (
}
private[fpm] object PrefixSpan {
object PrefixSpan {
private[fpm] val DELIMITER = -1
/** Splits an array of itemsets delimited by [[DELIMITER]]. */
......@@ -313,4 +334,25 @@ private[fpm] object PrefixSpan {
// TODO: improve complexity by using partial prefixes, considering one item at a time
itemSet.subsets.filter(_ != Set.empty[Int])
}
/**
* Represents a frequence sequence.
* @param sequence a sequence of itemsets stored as an Array of Arrays
* @param freq frequency
* @tparam Item item type
*/
class FreqSequence[Item](val sequence: Array[Array[Item]], val freq: Long) extends Serializable {
/**
* Returns sequence as a Java List of lists for Java users.
*/
def javaSequence: ju.List[ju.List[Item]] = sequence.map(_.toList.asJava).toList.asJava
}
}
/**
* Model fitted by [[PrefixSpan]]
* @param freqSequences frequent sequences
* @tparam Item item type
*/
class PrefixSpanModel[Item](val freqSequences: RDD[PrefixSpan.FreqSequence[Item]])
extends Serializable
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.fpm;
import java.util.Arrays;
import java.util.List;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.PrefixSpan.FreqSequence;
public class JavaPrefixSpanSuite {
private transient JavaSparkContext sc;
@Before
public void setUp() {
sc = new JavaSparkContext("local", "JavaPrefixSpan");
}
@After
public void tearDown() {
sc.stop();
sc = null;
}
@Test
public void runPrefixSpan() {
JavaRDD<List<List<Integer>>> sequences = sc.parallelize(Arrays.asList(
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)),
Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)),
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
Arrays.asList(Arrays.asList(6))
), 2);
PrefixSpan prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5);
PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
JavaRDD<FreqSequence<Integer>> freqSeqs = model.freqSequences().toJavaRDD();
List<FreqSequence<Integer>> localFreqSeqs = freqSeqs.collect();
Assert.assertEquals(5, localFreqSeqs.size());
// Check that each frequent sequence could be materialized.
for (PrefixSpan.FreqSequence<Integer> freqSeq: localFreqSeqs) {
List<List<Integer>> seq = freqSeq.javaSequence();
long freq = freqSeq.freq();
}
}
}
......@@ -296,7 +296,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
5 <{1,2}> 0.75
*/
val result = prefixspan.run(rdd)
val model = prefixspan.run(rdd)
val expected = Array(
(Array(Array(1)), 3L),
(Array(Array(2)), 3L),
......@@ -304,7 +304,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
(Array(Array(1), Array(3)), 2L),
(Array(Array(1, 2)), 3L)
)
compareResults(expected, result.collect())
compareResults(expected, model.freqSequences.collect().map(x => (x.sequence, x.freq)))
}
test("PrefixSpan String type, variable-size itemsets") {
......@@ -322,7 +322,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
.setMinSupport(0.5)
.setMaxPatternLength(5)
val result = prefixspan.run(rdd)
val model = prefixspan.run(rdd)
val expected = Array(
(Array(Array(1)), 3L),
(Array(Array(2)), 3L),
......@@ -332,7 +332,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
).map { case (pattern, count) =>
(pattern.map(itemSet => itemSet.map(intToString)), count)
}
compareResults(expected, result.collect())
compareResults(expected, model.freqSequences.collect().map(x => (x.sequence, x.freq)))
}
private def compareResults[Item](
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment