diff --git a/data/mllib/sample_fpgrowth.txt b/data/mllib/sample_fpgrowth.txt new file mode 100644 index 0000000000000000000000000000000000000000..c451583e5131796daa0bc3dde2a71f1d076d18a8 --- /dev/null +++ b/data/mllib/sample_fpgrowth.txt @@ -0,0 +1,6 @@ +r z h k p +z y x w v u t s +s x o n r +x z y m t s q e +z +x z y r q t p diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaFPGrowthExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaFPGrowthExample.java index f50e802cf683cd13d849948b340256a6591941a0..36baf5868736c863fee3079e791fc760a33c2ea4 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaFPGrowthExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaFPGrowthExample.java @@ -25,32 +25,49 @@ import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.fpm.FPGrowth; import org.apache.spark.mllib.fpm.FPGrowthModel; /** * Java example for mining frequent itemsets using FP-growth. + * Example usage: ./bin/run-example mllib.JavaFPGrowthExample ./data/mllib/sample_fpgrowth.txt */ public class JavaFPGrowthExample { public static void main(String[] args) { + String inputFile; + double minSupport = 0.3; + int numPartition = -1; + if (args.length < 1) { + System.err.println( + "Usage: JavaFPGrowth <input_file> [minSupport] [numPartition]"); + System.exit(1); + } + inputFile = args[0]; + if (args.length >= 2) { + minSupport = Double.parseDouble(args[1]); + } + if (args.length >= 3) { + numPartition = Integer.parseInt(args[2]); + } + SparkConf sparkConf = new SparkConf().setAppName("JavaFPGrowthExample"); JavaSparkContext sc = new JavaSparkContext(sparkConf); + JavaRDD<ArrayList<String>> transactions = sc.textFile(inputFile).map( + new Function<String, ArrayList<String>>() { + @Override + public ArrayList<String> call(String s) { + return Lists.newArrayList(s.split(" ")); + } + } + ); - // TODO: Read a user-specified input file. - @SuppressWarnings("unchecked") - JavaRDD<ArrayList<String>> transactions = sc.parallelize(Lists.newArrayList( - Lists.newArrayList("r z h k p".split(" ")), - Lists.newArrayList("z y x w v u t s".split(" ")), - Lists.newArrayList("s x o n r".split(" ")), - Lists.newArrayList("x z y m t s q e".split(" ")), - Lists.newArrayList("z".split(" ")), - Lists.newArrayList("x z y r q t p".split(" "))), 2); - - FPGrowth fpg = new FPGrowth() - .setMinSupport(0.3); - FPGrowthModel<String> model = fpg.run(transactions); + FPGrowthModel<String> model = new FPGrowth() + .setMinSupport(minSupport) + .setNumPartitions(numPartition) + .run(transactions); for (FPGrowth.FreqItemset<String> s: model.freqItemsets().toJavaRDD().collect()) { System.out.println("[" + Joiner.on(",").join(s.javaItems()) + "], " + s.freq()); diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala index aaae275ec5524b1e50529e736ca4bc0cb00f3fa5..13f24a1e5961049b9e8fbaee7ded051f4fe3fdbf 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala @@ -17,30 +17,61 @@ package org.apache.spark.examples.mllib +import scopt.OptionParser + import org.apache.spark.mllib.fpm.FPGrowth -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkConf, SparkContext} /** * Example for mining frequent itemsets using FP-growth. + * Example usage: ./bin/run-example mllib.FPGrowthExample \ + * --minSupport 0.8 --numPartition 2 ./data/mllib/sample_fpgrowth.txt */ object FPGrowthExample { + case class Params( + input: String = null, + minSupport: Double = 0.3, + numPartition: Int = -1) extends AbstractParams[Params] + def main(args: Array[String]) { - val conf = new SparkConf().setAppName("FPGrowthExample") + val defaultParams = Params() + + val parser = new OptionParser[Params]("FPGrowthExample") { + head("FPGrowth: an example FP-growth app.") + opt[Double]("minSupport") + .text(s"minimal support level, default: ${defaultParams.minSupport}") + .action((x, c) => c.copy(minSupport = x)) + opt[Int]("numPartition") + .text(s"number of partition, default: ${defaultParams.numPartition}") + .action((x, c) => c.copy(numPartition = x)) + arg[String]("<input>") + .text("input paths to input data set, whose file format is that each line " + + "contains a transaction with each item in String and separated by a space") + .required() + .action((x, c) => c.copy(input = x)) + } + + parser.parse(args, defaultParams).map { params => + run(params) + }.getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"FPGrowthExample with $params") val sc = new SparkContext(conf) + val transactions = sc.textFile(params.input).map(_.split(" ")).cache() + + println(s"Number of transactions: ${transactions.count()}") + + val model = new FPGrowth() + .setMinSupport(params.minSupport) + .setNumPartitions(params.numPartition) + .run(transactions) - // TODO: Read a user-specified input file. - val transactions = sc.parallelize(Seq( - "r z h k p", - "z y x w v u t s", - "s x o n r", - "x z y m t s q e", - "z", - "x z y r q t p").map(_.split(" ")), numSlices = 2) - - val fpg = new FPGrowth() - .setMinSupport(0.3) - val model = fpg.run(transactions) + println(s"Number of frequent itemsets: ${model.freqItemsets.count()}") model.freqItemsets.collect().foreach { itemset => println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)