diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index 327cb974ef96ccf08e23d163c2cedd369002307b..3f8d65a378e2c5b57b7cd0ee336b4e57db58cabc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -144,45 +144,13 @@ class PrefixSpan private (
     logInfo(s"minimum count for a frequent pattern: $minCount")
 
     // Find frequent items.
-    val freqItemAndCounts = data.flatMap { itemsets =>
-        val uniqItems = mutable.Set.empty[Item]
-        itemsets.foreach { _.foreach { item =>
-          uniqItems += item
-        }}
-        uniqItems.toIterator.map((_, 1L))
-      }.reduceByKey(_ + _)
-      .filter { case (_, count) =>
-        count >= minCount
-      }.collect()
-    val freqItems = freqItemAndCounts.sortBy(-_._2).map(_._1)
+    val freqItems = findFrequentItems(data, minCount)
     logInfo(s"number of frequent items: ${freqItems.length}")
 
     // Keep only frequent items from input sequences and convert them to internal storage.
     val itemToInt = freqItems.zipWithIndex.toMap
-    val dataInternalRepr = data.flatMap { itemsets =>
-      val allItems = mutable.ArrayBuilder.make[Int]
-      var containsFreqItems = false
-      allItems += 0
-      itemsets.foreach { itemsets =>
-        val items = mutable.ArrayBuilder.make[Int]
-        itemsets.foreach { item =>
-          if (itemToInt.contains(item)) {
-            items += itemToInt(item) + 1 // using 1-indexing in internal format
-          }
-        }
-        val result = items.result()
-        if (result.nonEmpty) {
-          containsFreqItems = true
-          allItems ++= result.sorted
-        }
-        allItems += 0
-      }
-      if (containsFreqItems) {
-        Iterator.single(allItems.result())
-      } else {
-        Iterator.empty
-      }
-    }.persist(StorageLevel.MEMORY_AND_DISK)
+    val dataInternalRepr = toDatabaseInternalRepr(data, itemToInt)
+      .persist(StorageLevel.MEMORY_AND_DISK)
 
     val results = genFreqPatterns(dataInternalRepr, minCount, maxPatternLength, maxLocalProjDBSize)
 
@@ -231,6 +199,67 @@ class PrefixSpan private (
 @Since("1.5.0")
 object PrefixSpan extends Logging {
 
+  /**
+   * This methods finds all frequent items in a input dataset.
+   *
+   * @param data Sequences of itemsets.
+   * @param minCount The minimal number of sequence an item should be present in to be frequent
+   *
+   * @return An array of Item containing only frequent items.
+   */
+  private[fpm] def findFrequentItems[Item: ClassTag](
+      data: RDD[Array[Array[Item]]],
+      minCount: Long): Array[Item] = {
+
+    data.flatMap { itemsets =>
+      val uniqItems = mutable.Set.empty[Item]
+      itemsets.foreach(set => uniqItems ++= set)
+      uniqItems.toIterator.map((_, 1L))
+    }.reduceByKey(_ + _).filter { case (_, count) =>
+      count >= minCount
+    }.sortBy(-_._2).map(_._1).collect()
+  }
+
+  /**
+   * This methods cleans the input dataset from un-frequent items, and translate it's item
+   * to their corresponding Int identifier.
+   *
+   * @param data Sequences of itemsets.
+   * @param itemToInt A map allowing translation of frequent Items to their Int Identifier.
+   *                  The map should only contain frequent item.
+   *
+   * @return The internal repr of the inputted dataset. With properly placed zero delimiter.
+   */
+  private[fpm] def toDatabaseInternalRepr[Item: ClassTag](
+      data: RDD[Array[Array[Item]]],
+      itemToInt: Map[Item, Int]): RDD[Array[Int]] = {
+
+    data.flatMap { itemsets =>
+      val allItems = mutable.ArrayBuilder.make[Int]
+      var containsFreqItems = false
+      allItems += 0
+      itemsets.foreach { itemsets =>
+        val items = mutable.ArrayBuilder.make[Int]
+        itemsets.foreach { item =>
+          if (itemToInt.contains(item)) {
+            items += itemToInt(item) + 1 // using 1-indexing in internal format
+          }
+        }
+        val result = items.result()
+        if (result.nonEmpty) {
+          containsFreqItems = true
+          allItems ++= result.sorted
+          allItems += 0
+        }
+      }
+      if (containsFreqItems) {
+        Iterator.single(allItems.result())
+      } else {
+        Iterator.empty
+      }
+    }
+  }
+
   /**
    * Find the complete set of frequent sequential patterns in the input sequences.
    * @param data ordered sequences of itemsets. We represent a sequence internally as Array[Int],
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
index 4c2376376dd2af6f332d509aba41ce649cae6e02..c2e08d078fc1a684e3439c9599680c1143ff1613 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
@@ -360,6 +360,49 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
     compareResults(expected, model.freqSequences.collect())
   }
 
+  test("PrefixSpan pre-processing's cleaning test") {
+
+    // One item per itemSet
+    val itemToInt1 = (4 to 5).zipWithIndex.toMap
+    val sequences1 = Seq(
+      Array(Array(4), Array(1), Array(2), Array(5), Array(2), Array(4), Array(5)),
+      Array(Array(6), Array(7), Array(8)))
+    val rdd1 = sc.parallelize(sequences1, 2).cache()
+
+    val cleanedSequence1 = PrefixSpan.toDatabaseInternalRepr(rdd1, itemToInt1).collect()
+
+    val expected1 = Array(Array(0, 4, 0, 5, 0, 4, 0, 5, 0))
+      .map(_.map(x => if (x == 0) 0 else itemToInt1(x) + 1))
+
+    compareInternalSequences(expected1, cleanedSequence1)
+
+    // Multi-item sequence
+    val itemToInt2 = (4 to 6).zipWithIndex.toMap
+    val sequences2 = Seq(
+      Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)),
+      Array(Array(8, 9), Array(1, 2)))
+    val rdd2 = sc.parallelize(sequences2, 2).cache()
+
+    val cleanedSequence2 = PrefixSpan.toDatabaseInternalRepr(rdd2, itemToInt2).collect()
+
+    val expected2 = Array(Array(0, 4, 5, 0, 6, 0, 5, 0, 4, 0, 5, 6, 0))
+      .map(_.map(x => if (x == 0) 0 else itemToInt2(x) + 1))
+
+    compareInternalSequences(expected2, cleanedSequence2)
+
+    // Emptied sequence
+    val itemToInt3 = (10 to 10).zipWithIndex.toMap
+    val sequences3 = Seq(
+      Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)),
+      Array(Array(8, 9), Array(1, 2)))
+    val rdd3 = sc.parallelize(sequences3, 2).cache()
+
+    val cleanedSequence3 = PrefixSpan.toDatabaseInternalRepr(rdd3, itemToInt3).collect()
+    val expected3 = Array[Array[Int]]()
+
+    compareInternalSequences(expected3, cleanedSequence3)
+  }
+
   test("model save/load") {
     val sequences = Seq(
       Array(Array(1, 2), Array(3)),
@@ -409,4 +452,12 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
     val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet
     assert(expectedSet === actualSet)
   }
+
+  private def compareInternalSequences(
+      expectedValue: Array[Array[Int]],
+      actualValue: Array[Array[Int]]): Unit = {
+    val expectedSet = expectedValue.map(x => x.toSeq).toSet
+    val actualSet = actualValue.map(x => x.toSeq).toSet
+    assert(expectedSet === actualSet)
+  }
 }