From d2a9b66f6c0de89d6d16370af1c77c7f51b11d3e Mon Sep 17 00:00:00 2001 From: zhangjiajin <zhangjiajin@huawei.com> Date: Sat, 1 Aug 2015 01:56:27 -0700 Subject: [PATCH] [SPARK-8999] [MLLIB] PrefixSpan non-temporal sequences mengxr Extends PrefixSpan to non-temporal itemsets. Continues work by zhangjiajin * Internal API uses List[Set[Int]] which is likely not efficient; will need to refactor during QA Closes #7646 Author: zhangjiajin <zhangjiajin@huawei.com> Author: Feynman Liang <fliang@databricks.com> Author: zhang jiajin <zhangjiajin@huawei.com> Closes #7818 from feynmanliang/SPARK-8999-nonTemporal and squashes the following commits: 4ded81d [Feynman Liang] Replace all filters to filter nonempty 350e67e [Feynman Liang] Code review feedback 03156ca [Feynman Liang] Fix tests, drop delimiters at boundaries of sequences d1fe0ed [Feynman Liang] Remove comments 86ca4e5 [Feynman Liang] Fix style 7c7bf39 [Feynman Liang] Fixed itemSet sequences 6073b10 [Feynman Liang] Basic itemset functionality, failing test 1a7fb48 [Feynman Liang] Add delimiter to results 5db00aa [Feynman Liang] Working for items, not itemsets 6787716 [Feynman Liang] Working on temporal sequences f1114b9 [Feynman Liang] Add -1 delimiter 00fe756 [Feynman Liang] Reset base files for rebase f486dcd [zhangjiajin] change maxLocalProjDBSize and fix a bug (remove -3 from frequent items). 60a0b76 [zhangjiajin] fixed a scala style error. 740c203 [zhangjiajin] fixed a scala style error. 5785cb8 [zhangjiajin] support non-temporal sequence a5d649d [zhangjiajin] restore original version 09dc409 [zhangjiajin] Merge branch 'master' of https://github.com/apache/spark into multiItems_2 ae8c02d [zhangjiajin] Fixed some Scala style errors. 216ab0c [zhangjiajin] Support non-temporal sequence in PrefixSpan b572f54 [zhangjiajin] initialize file before rebase. f06772f [zhangjiajin] fix a scala style error. a7e50d4 [zhangjiajin] Add feature: Collect enough frequent prefixes before projection in PrefixSpan. c1d13d0 [zhang jiajin] Delete PrefixspanSuite.scala d9d8137 [zhang jiajin] Delete Prefixspan.scala c6ceb63 [zhangjiajin] Add new algorithm PrefixSpan and test file. --- .../spark/mllib/fpm/LocalPrefixSpan.scala | 46 ++-- .../apache/spark/mllib/fpm/PrefixSpan.scala | 111 +++++--- .../spark/mllib/fpm/PrefixSpanSuite.scala | 237 +++++++++++++++--- 3 files changed, 302 insertions(+), 92 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala index 0ea7920810..ccebf951c8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala @@ -25,7 +25,7 @@ import org.apache.spark.Logging * Calculate all patterns of a projected database in local. */ private[fpm] object LocalPrefixSpan extends Logging with Serializable { - + import PrefixSpan._ /** * Calculate all patterns of a projected database. * @param minCount minimum count @@ -39,12 +39,19 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { def run( minCount: Long, maxPatternLength: Int, - prefixes: List[Int], - database: Iterable[Array[Int]]): Iterator[(List[Int], Long)] = { - if (prefixes.length == maxPatternLength || database.isEmpty) return Iterator.empty - val frequentItemAndCounts = getFreqItemAndCounts(minCount, database) - val filteredDatabase = database.map(x => x.filter(frequentItemAndCounts.contains)) - frequentItemAndCounts.iterator.flatMap { case (item, count) => + prefixes: List[Set[Int]], + database: Iterable[List[Set[Int]]]): Iterator[(List[Set[Int]], Long)] = { + if (prefixes.length == maxPatternLength || database.isEmpty) { + return Iterator.empty + } + val freqItemSetsAndCounts = getFreqItemAndCounts(minCount, database) + val freqItems = freqItemSetsAndCounts.keys.flatten.toSet + val filteredDatabase = database.map { suffix => + suffix + .map(item => freqItems.intersect(item)) + .filter(_.nonEmpty) + } + freqItemSetsAndCounts.iterator.flatMap { case (item, count) => val newPrefixes = item :: prefixes val newProjected = project(filteredDatabase, item) Iterator.single((newPrefixes, count)) ++ @@ -54,20 +61,23 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { /** * Calculate suffix sequence immediately after the first occurrence of an item. - * @param item item to get suffix after + * @param item itemset to get suffix after * @param sequence sequence to extract suffix from * @return suffix sequence */ - def getSuffix(item: Int, sequence: Array[Int]): Array[Int] = { - val index = sequence.indexOf(item) + def getSuffix(item: Set[Int], sequence: List[Set[Int]]): List[Set[Int]] = { + val itemsetSeq = sequence + val index = itemsetSeq.indexWhere(item.subsetOf(_)) if (index == -1) { - Array() + List() } else { - sequence.drop(index + 1) + itemsetSeq.drop(index + 1) } } - def project(database: Iterable[Array[Int]], prefix: Int): Iterable[Array[Int]] = { + def project( + database: Iterable[List[Set[Int]]], + prefix: Set[Int]): Iterable[List[Set[Int]]] = { database .map(getSuffix(prefix, _)) .filter(_.nonEmpty) @@ -81,14 +91,16 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { */ private def getFreqItemAndCounts( minCount: Long, - database: Iterable[Array[Int]]): mutable.Map[Int, Long] = { + database: Iterable[List[Set[Int]]]): Map[Set[Int], Long] = { // TODO: use PrimitiveKeyOpenHashMap - val counts = mutable.Map[Int, Long]().withDefaultValue(0L) + val counts = mutable.Map[Set[Int], Long]().withDefaultValue(0L) database.foreach { sequence => - sequence.distinct.foreach { item => + sequence.flatMap(nonemptySubsets(_)).distinct.foreach { item => counts(item) += 1L } } - counts.filter(_._2 >= minCount) + counts + .filter { case (_, count) => count >= minCount } + .toMap } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index e6752332cd..22b4ddb8b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.fpm -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.ArrayBuilder import org.apache.spark.Logging import org.apache.spark.annotation.Experimental @@ -44,13 +44,14 @@ import org.apache.spark.storage.StorageLevel class PrefixSpan private ( private var minSupport: Double, private var maxPatternLength: Int) extends Logging with Serializable { + import PrefixSpan._ /** * The maximum number of items allowed in a projected database before local processing. If a * projected database exceeds this size, another iteration of distributed PrefixSpan is run. */ - // TODO: make configurable with a better default value, 10000 may be too small - private val maxLocalProjDBSize: Long = 10000 + // TODO: make configurable with a better default value + private val maxLocalProjDBSize: Long = 32000000L /** * Constructs a default instance with default parameters @@ -90,35 +91,41 @@ class PrefixSpan private ( /** * Find the complete set of sequential patterns in the input sequences. - * @param sequences input data set, contains a set of sequences, - * a sequence is an ordered list of elements. + * @param data ordered sequences of itemsets. Items are represented by non-negative integers. + * Each itemset has one or more items and is delimited by [[DELIMITER]]. * @return a set of sequential pattern pairs, * the key of pair is pattern (a list of elements), * the value of pair is the pattern's count. */ - def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { - val sc = sequences.sparkContext + // TODO: generalize to arbitrary item-types and use mapping to Ints for internal algorithm + def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { + val sc = data.sparkContext - if (sequences.getStorageLevel == StorageLevel.NONE) { + if (data.getStorageLevel == StorageLevel.NONE) { logWarning("Input data is not cached.") } + // Use List[Set[Item]] for internal computation + val sequences = data.map { seq => splitSequence(seq.toList) } + // Convert min support to a min number of transactions for this dataset val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong // (Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold val freqItemCounts = sequences - .flatMap(seq => seq.distinct.map(item => (item, 1L))) + .flatMap(seq => seq.flatMap(nonemptySubsets(_)).distinct.map(item => (item, 1L))) .reduceByKey(_ + _) - .filter(_._2 >= minCount) + .filter { case (item, count) => (count >= minCount) } .collect() + .toMap // Pairs of (length 1 prefix, suffix consisting of frequent items) val itemSuffixPairs = { - val freqItems = freqItemCounts.map(_._1).toSet + val freqItemSets = freqItemCounts.keys.toSet + val freqItems = freqItemSets.flatten sequences.flatMap { seq => - val filteredSeq = seq.filter(freqItems.contains(_)) - freqItems.flatMap { item => + val filteredSeq = seq.map(item => freqItems.intersect(item)).filter(_.nonEmpty) + freqItemSets.flatMap { item => val candidateSuffix = LocalPrefixSpan.getSuffix(item, filteredSeq) candidateSuffix match { case suffix if !suffix.isEmpty => Some((List(item), suffix)) @@ -130,14 +137,15 @@ class PrefixSpan private ( // Accumulator for the computed results to be returned, initialized to the frequent items (i.e. // frequent length-one prefixes) - var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2)) + var resultsAccumulator = freqItemCounts.map { case (item, count) => (List(item), count) }.toList // Remaining work to be locally and distributively processed respectfully var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(itemSuffixPairs) // Continue processing until no pairs for distributed processing remain (i.e. all prefixes have - // projected database sizes <= `maxLocalProjDBSize`) - while (pairsForDistributed.count() != 0) { + // projected database sizes <= `maxLocalProjDBSize`) or `maxPatternLength` is reached + var patternLength = 1 + while (pairsForDistributed.count() != 0 && patternLength < maxPatternLength) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = extendPrefixes(minCount, pairsForDistributed) pairsForDistributed.unpersist() @@ -146,6 +154,7 @@ class PrefixSpan private ( pairsForDistributed.persist(StorageLevel.MEMORY_AND_DISK) pairsForLocal ++= smallerPairsPart resultsAccumulator ++= nextPatternAndCounts.collect() + patternLength += 1 // pattern length grows one per iteration } // Process the small projected databases locally @@ -153,7 +162,7 @@ class PrefixSpan private ( minCount, sc.parallelize(pairsForLocal, 1).groupByKey()) (sc.parallelize(resultsAccumulator, 1) ++ remainingResults) - .map { case (pattern, count) => (pattern.toArray, count) } + .map { case (pattern, count) => (flattenSequence(pattern.reverse).toArray, count) } } @@ -163,8 +172,8 @@ class PrefixSpan private ( * @return prefix-suffix pairs partitioned by whether their projected database size is <= or * greater than [[maxLocalProjDBSize]] */ - private def partitionByProjDBSize(prefixSuffixPairs: RDD[(List[Int], Array[Int])]) - : (Array[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = { + private def partitionByProjDBSize(prefixSuffixPairs: RDD[(List[Set[Int]], List[Set[Int]])]) + : (List[(List[Set[Int]], List[Set[Int]])], RDD[(List[Set[Int]], List[Set[Int]])]) = { val prefixToSuffixSize = prefixSuffixPairs .aggregateByKey(0)( seqOp = { case (count, suffix) => count + suffix.length }, @@ -176,12 +185,12 @@ class PrefixSpan private ( .toSet val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) } val large = prefixSuffixPairs.filter { case (prefix, _) => !smallPrefixes.contains(prefix) } - (small.collect(), large) + (small.collect().toList, large) } /** - * Extends all prefixes by one item from their suffix and computes the resulting frequent prefixes - * and remaining work. + * Extends all prefixes by one itemset from their suffix and computes the resulting frequent + * prefixes and remaining work. * @param minCount minimum count * @param prefixSuffixPairs prefix (length N) and suffix pairs, * @return (frequent length N+1 extended prefix, count) pairs and (frequent length N+1 extended @@ -189,15 +198,16 @@ class PrefixSpan private ( */ private def extendPrefixes( minCount: Long, - prefixSuffixPairs: RDD[(List[Int], Array[Int])]) - : (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = { + prefixSuffixPairs: RDD[(List[Set[Int]], List[Set[Int]])]) + : (RDD[(List[Set[Int]], Long)], RDD[(List[Set[Int]], List[Set[Int]])]) = { - // (length N prefix, item from suffix) pairs and their corresponding number of occurrences + // (length N prefix, itemset from suffix) pairs and their corresponding number of occurrences // Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport` val prefixItemPairAndCounts = prefixSuffixPairs - .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) } + .flatMap { case (prefix, suffix) => + suffix.flatMap(nonemptySubsets(_)).distinct.map(y => ((prefix, y), 1L)) } .reduceByKey(_ + _) - .filter(_._2 >= minCount) + .filter { case (item, count) => (count >= minCount) } // Map from prefix to set of possible next items from suffix val prefixToNextItems = prefixItemPairAndCounts @@ -207,7 +217,6 @@ class PrefixSpan private ( .collect() .toMap - // Frequent patterns with length N+1 and their corresponding counts val extendedPrefixAndCounts = prefixItemPairAndCounts .map { case ((prefix, item), count) => (item :: prefix, count) } @@ -216,9 +225,12 @@ class PrefixSpan private ( val extendedPrefixAndSuffix = prefixSuffixPairs .filter(x => prefixToNextItems.contains(x._1)) .flatMap { case (prefix, suffix) => - val frequentNextItems = prefixToNextItems(prefix) - val filteredSuffix = suffix.filter(frequentNextItems.contains(_)) - frequentNextItems.flatMap { item => + val frequentNextItemSets = prefixToNextItems(prefix) + val frequentNextItems = frequentNextItemSets.flatten + val filteredSuffix = suffix + .map(item => frequentNextItems.intersect(item)) + .filter(_.nonEmpty) + frequentNextItemSets.flatMap { item => LocalPrefixSpan.getSuffix(item, filteredSuffix) match { case suffix if !suffix.isEmpty => Some(item :: prefix, suffix) case _ => None @@ -237,13 +249,38 @@ class PrefixSpan private ( */ private def getPatternsInLocal( minCount: Long, - data: RDD[(List[Int], Iterable[Array[Int]])]): RDD[(List[Int], Long)] = { + data: RDD[(List[Set[Int]], Iterable[List[Set[Int]]])]): RDD[(List[Set[Int]], Long)] = { data.flatMap { - case (prefix, projDB) => - LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB) - .map { case (pattern: List[Int], count: Long) => - (pattern.reverse, count) - } + case (prefix, projDB) => LocalPrefixSpan.run(minCount, maxPatternLength, prefix, projDB) + } + } + +} + +private[fpm] object PrefixSpan { + private[fpm] val DELIMITER = -1 + + /** Splits a sequence of itemsets delimited by [[DELIMITER]]. */ + private[fpm] def splitSequence(sequence: List[Int]): List[Set[Int]] = { + sequence.span(_ != DELIMITER) match { + case (x, xs) if xs.length > 1 => x.toSet :: splitSequence(xs.tail) + case (x, xs) => List(x.toSet) + } + } + + /** Flattens a sequence of itemsets into an Array, inserting[[DELIMITER]] between itemsets. */ + private[fpm] def flattenSequence(sequence: List[Set[Int]]): List[Int] = { + val builder = ArrayBuilder.make[Int]() + for (itemSet <- sequence) { + builder += DELIMITER + builder ++= itemSet.toSeq.sorted } + builder.result().toList.drop(1) // drop trailing delimiter + } + + /** Returns an iterator over all non-empty subsets of `itemSet` */ + private[fpm] def nonemptySubsets(itemSet: Set[Int]): Iterator[Set[Int]] = { + // TODO: improve complexity by using partial prefixes, considering one item at a time + itemSet.subsets.filter(_ != Set.empty[Int]) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 6dd2dc926a..457f32670f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { - test("PrefixSpan using Integer type") { + test("PrefixSpan using Integer type, singleton itemsets") { /* library("arulesSequences") @@ -35,12 +35,12 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { */ val sequences = Array( - Array(1, 3, 4, 5), - Array(2, 3, 1), - Array(2, 4, 1), - Array(3, 1, 3, 4, 5), - Array(3, 4, 4, 3), - Array(6, 5, 3)) + Array(1, -1, 3, -1, 4, -1, 5), + Array(2, -1, 3, -1, 1), + Array(2, -1, 4, -1, 1), + Array(3, -1, 1, -1, 3, -1, 4, -1, 5), + Array(3, -1, 4, -1, 4, -1, 3), + Array(6, -1, 5, -1, 3)) val rdd = sc.parallelize(sequences, 2).cache() @@ -50,64 +50,225 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { val result1 = prefixspan.run(rdd) val expectedValue1 = Array( (Array(1), 4L), - (Array(1, 3), 2L), - (Array(1, 3, 4), 2L), - (Array(1, 3, 4, 5), 2L), - (Array(1, 3, 5), 2L), - (Array(1, 4), 2L), - (Array(1, 4, 5), 2L), - (Array(1, 5), 2L), + (Array(1, -1, 3), 2L), + (Array(1, -1, 3, -1, 4), 2L), + (Array(1, -1, 3, -1, 4, -1, 5), 2L), + (Array(1, -1, 3, -1, 5), 2L), + (Array(1, -1, 4), 2L), + (Array(1, -1, 4, -1, 5), 2L), + (Array(1, -1, 5), 2L), (Array(2), 2L), - (Array(2, 1), 2L), + (Array(2, -1, 1), 2L), (Array(3), 5L), - (Array(3, 1), 2L), - (Array(3, 3), 2L), - (Array(3, 4), 3L), - (Array(3, 4, 5), 2L), - (Array(3, 5), 2L), + (Array(3, -1, 1), 2L), + (Array(3, -1, 3), 2L), + (Array(3, -1, 4), 3L), + (Array(3, -1, 4, -1, 5), 2L), + (Array(3, -1, 5), 2L), (Array(4), 4L), - (Array(4, 5), 2L), + (Array(4, -1, 5), 2L), (Array(5), 3L) ) - assert(compareResults(expectedValue1, result1.collect())) + compareResults(expectedValue1, result1.collect()) prefixspan.setMinSupport(0.5).setMaxPatternLength(50) val result2 = prefixspan.run(rdd) val expectedValue2 = Array( (Array(1), 4L), (Array(3), 5L), - (Array(3, 4), 3L), + (Array(3, -1, 4), 3L), (Array(4), 4L), (Array(5), 3L) ) - assert(compareResults(expectedValue2, result2.collect())) + compareResults(expectedValue2, result2.collect()) prefixspan.setMinSupport(0.33).setMaxPatternLength(2) val result3 = prefixspan.run(rdd) val expectedValue3 = Array( (Array(1), 4L), - (Array(1, 3), 2L), - (Array(1, 4), 2L), - (Array(1, 5), 2L), - (Array(2, 1), 2L), + (Array(1, -1, 3), 2L), + (Array(1, -1, 4), 2L), + (Array(1, -1, 5), 2L), + (Array(2, -1, 1), 2L), (Array(2), 2L), (Array(3), 5L), - (Array(3, 1), 2L), - (Array(3, 3), 2L), - (Array(3, 4), 3L), - (Array(3, 5), 2L), + (Array(3, -1, 1), 2L), + (Array(3, -1, 3), 2L), + (Array(3, -1, 4), 3L), + (Array(3, -1, 5), 2L), (Array(4), 4L), - (Array(4, 5), 2L), + (Array(4, -1, 5), 2L), (Array(5), 3L) ) - assert(compareResults(expectedValue3, result3.collect())) + compareResults(expectedValue3, result3.collect()) + } + + test("PrefixSpan using Integer type, variable-size itemsets") { + val sequences = Array( + Array(1, -1, 1, 2, 3, -1, 1, 3, -1, 4, -1, 3, 6), + Array(1, 4, -1, 3, -1, 2, 3, -1, 1, 5), + Array(5, 6, -1, 1, 2, -1, 4, 6, -1, 3, -1, 2), + Array(5, -1, 7, -1, 1, 6, -1, 3, -1, 2, -1, 3)) + val rdd = sc.parallelize(sequences, 2).cache() + val prefixspan = new PrefixSpan().setMinSupport(0.5).setMaxPatternLength(5) + val result = prefixspan.run(rdd) + + /* + To verify results, create file "prefixSpanSeqs" with content + (format = (transactionID, idxInTransaction, numItemsinItemset, itemset)): + 1 1 1 1 + 1 2 3 1 2 3 + 1 3 2 1 3 + 1 4 1 4 + 1 5 2 3 6 + 2 1 2 1 4 + 2 2 1 3 + 2 3 2 2 3 + 2 4 2 1 5 + 3 1 2 5 6 + 3 2 2 1 2 + 3 3 2 4 6 + 3 4 1 3 + 3 5 1 2 + 4 1 1 5 + 4 2 1 7 + 4 3 2 1 6 + 4 4 1 3 + 4 5 1 2 + 4 6 1 3 + In R, run: + library("arulesSequences") + prefixSpanSeqs = read_baskets("prefixSpanSeqs", info = c("sequenceID","eventID","SIZE")) + freqItemSeq = cspade(prefixSpanSeqs, + parameter = list(support = 0.5, maxlen = 5 )) + resSeq = as(freqItemSeq, "data.frame") + resSeq + + sequence support + 1 <{1}> 1.00 + 2 <{2}> 1.00 + 3 <{3}> 1.00 + 4 <{4}> 0.75 + 5 <{5}> 0.75 + 6 <{6}> 0.75 + 7 <{1},{6}> 0.50 + 8 <{2},{6}> 0.50 + 9 <{5},{6}> 0.50 + 10 <{1,2},{6}> 0.50 + 11 <{1},{4}> 0.50 + 12 <{2},{4}> 0.50 + 13 <{1,2},{4}> 0.50 + 14 <{1},{3}> 1.00 + 15 <{2},{3}> 0.75 + 16 <{2,3}> 0.50 + 17 <{3},{3}> 0.75 + 18 <{4},{3}> 0.75 + 19 <{5},{3}> 0.50 + 20 <{6},{3}> 0.50 + 21 <{5},{6},{3}> 0.50 + 22 <{6},{2},{3}> 0.50 + 23 <{5},{2},{3}> 0.50 + 24 <{5},{1},{3}> 0.50 + 25 <{2},{4},{3}> 0.50 + 26 <{1},{4},{3}> 0.50 + 27 <{1,2},{4},{3}> 0.50 + 28 <{1},{3},{3}> 0.75 + 29 <{1,2},{3}> 0.50 + 30 <{1},{2},{3}> 0.50 + 31 <{1},{2,3}> 0.50 + 32 <{1},{2}> 1.00 + 33 <{1,2}> 0.50 + 34 <{3},{2}> 0.75 + 35 <{4},{2}> 0.50 + 36 <{5},{2}> 0.50 + 37 <{6},{2}> 0.50 + 38 <{5},{6},{2}> 0.50 + 39 <{6},{3},{2}> 0.50 + 40 <{5},{3},{2}> 0.50 + 41 <{5},{1},{2}> 0.50 + 42 <{4},{3},{2}> 0.50 + 43 <{1},{3},{2}> 0.75 + 44 <{5},{6},{3},{2}> 0.50 + 45 <{5},{1},{3},{2}> 0.50 + 46 <{1},{1}> 0.50 + 47 <{2},{1}> 0.50 + 48 <{3},{1}> 0.50 + 49 <{5},{1}> 0.50 + 50 <{2,3},{1}> 0.50 + 51 <{1},{3},{1}> 0.50 + 52 <{1},{2,3},{1}> 0.50 + 53 <{1},{2},{1}> 0.50 + */ + val expectedValue = Array( + (Array(1), 4L), + (Array(2), 4L), + (Array(3), 4L), + (Array(4), 3L), + (Array(5), 3L), + (Array(6), 3L), + (Array(1, -1, 6), 2L), + (Array(2, -1, 6), 2L), + (Array(5, -1, 6), 2L), + (Array(1, 2, -1, 6), 2L), + (Array(1, -1, 4), 2L), + (Array(2, -1, 4), 2L), + (Array(1, 2, -1, 4), 2L), + (Array(1, -1, 3), 4L), + (Array(2, -1, 3), 3L), + (Array(2, 3), 2L), + (Array(3, -1, 3), 3L), + (Array(4, -1, 3), 3L), + (Array(5, -1, 3), 2L), + (Array(6, -1, 3), 2L), + (Array(5, -1, 6, -1, 3), 2L), + (Array(6, -1, 2, -1, 3), 2L), + (Array(5, -1, 2, -1, 3), 2L), + (Array(5, -1, 1, -1, 3), 2L), + (Array(2, -1, 4, -1, 3), 2L), + (Array(1, -1, 4, -1, 3), 2L), + (Array(1, 2, -1, 4, -1, 3), 2L), + (Array(1, -1, 3, -1, 3), 3L), + (Array(1, 2, -1, 3), 2L), + (Array(1, -1, 2, -1, 3), 2L), + (Array(1, -1, 2, 3), 2L), + (Array(1, -1, 2), 4L), + (Array(1, 2), 2L), + (Array(3, -1, 2), 3L), + (Array(4, -1, 2), 2L), + (Array(5, -1, 2), 2L), + (Array(6, -1, 2), 2L), + (Array(5, -1, 6, -1, 2), 2L), + (Array(6, -1, 3, -1, 2), 2L), + (Array(5, -1, 3, -1, 2), 2L), + (Array(5, -1, 1, -1, 2), 2L), + (Array(4, -1, 3, -1, 2), 2L), + (Array(1, -1, 3, -1, 2), 3L), + (Array(5, -1, 6, -1, 3, -1, 2), 2L), + (Array(5, -1, 1, -1, 3, -1, 2), 2L), + (Array(1, -1, 1), 2L), + (Array(2, -1, 1), 2L), + (Array(3, -1, 1), 2L), + (Array(5, -1, 1), 2L), + (Array(2, 3, -1, 1), 2L), + (Array(1, -1, 3, -1, 1), 2L), + (Array(1, -1, 2, 3, -1, 1), 2L), + (Array(1, -1, 2, -1, 1), 2L)) + + compareResults(expectedValue, result.collect()) } private def compareResults( - expectedValue: Array[(Array[Int], Long)], - actualValue: Array[(Array[Int], Long)]): Boolean = { - expectedValue.map(x => (x._1.toSeq, x._2)).toSet == - actualValue.map(x => (x._1.toSeq, x._2)).toSet + expectedValue: Array[(Array[Int], Long)], + actualValue: Array[(Array[Int], Long)]): Unit = { + val expectedSet = expectedValue.map(x => (x._1.toSeq, x._2)).toSet + val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet + assert(expectedSet === actualSet) + } + + private def insertDelimiter(sequence: Array[Int]): Array[Int] = { + sequence.zip(Seq.fill(sequence.length)(PrefixSpan.DELIMITER)).map { case (a, b) => + List(a, b) + }.flatten } } -- GitLab