Skip to content
Snippets Groups Projects
Commit 8529ced3 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

SPARK-2657 Use more compact data structures than ArrayBuffer in groupBy & cogroup

JIRA: https://issues.apache.org/jira/browse/SPARK-2657

Our current code uses ArrayBuffers for each group of values in groupBy, as well as for the key's elements in CoGroupedRDD. ArrayBuffers have a lot of overhead if there are few values in them, which is likely to happen in cases such as join. In particular, they have a pointer to an Object[] of size 16 by default, which is 24 bytes for the array header + 128 for the pointers in there, plus at least 32 for the ArrayBuffer data structure. This patch replaces the per-group buffers with a CompactBuffer class that can store up to 2 elements more efficiently (in fields of itself) and acts like an ArrayBuffer beyond that. For a key's elements in CoGroupedRDD, we use an Array of CompactBuffers instead of an ArrayBuffer of ArrayBuffers.

There are some changes throughout the code to deal with CoGroupedRDD returning Array instead. We can also decide not to do that but CoGroupedRDD is a `DeveloperAPI` so I think it's okay to change it here.

Author: Matei Zaharia <matei@databricks.com>

Closes #1555 from mateiz/compact-groupby and squashes the following commits:

845a356 [Matei Zaharia] Lower initial size of CompactBuffer's vector to 8
07621a7 [Matei Zaharia] Review comments
0c1cd12 [Matei Zaharia] Don't use varargs in CompactBuffer.apply
bdc8a39 [Matei Zaharia] Small tweak to +=, and typos
f61f040 [Matei Zaharia] Fix line lengths
59da88b0 [Matei Zaharia] Fix line lengths
197cde8 [Matei Zaharia] Make CompactBuffer extend Seq to make its toSeq more efficient
775110f [Matei Zaharia] Change CoGroupedRDD to give (K, Array[Iterable[_]]) to avoid wrappers
9b4c6e8 [Matei Zaharia] Use CompactBuffer in CoGroupedRDD
ed577ab [Matei Zaharia] Use CompactBuffer in groupByKey
10f0de1 [Matei Zaharia] A CompactBuffer that's more memory-efficient than ArrayBuffer for small buffers
parent 2f75a4a3
No related branches found
No related tags found
No related merge requests found
Showing with 334 additions and 43 deletions
......@@ -25,7 +25,7 @@ import scala.language.existentials
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle
......@@ -66,14 +66,14 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
*/
@DeveloperApi
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
// For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
// Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
// CoGroupValue is the intermediate state of each value before being merged in compute.
private type CoGroup = ArrayBuffer[Any]
private type CoGroup = CompactBuffer[Any]
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Seq[CoGroup]
private type CoGroupCombiner = Array[CoGroup]
private var serializer: Option[Serializer] = None
......@@ -114,7 +114,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner: Some[Partitioner] = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
......@@ -150,7 +150,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
getCombiner(kv._1)(depNum) += kv._2
}
}
new InterruptibleIterator(context, map.iterator)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
} else {
val map = createExternalMap(numRdds)
rddIterators.foreach { case (it, depNum) =>
......@@ -161,7 +162,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
}
context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
new InterruptibleIterator(context, map.iterator)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
}
......
......@@ -46,6 +46,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.collection.CompactBuffer
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
......@@ -361,12 +362,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => ArrayBuffer(v)
val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
bufs.mapValues(_.toIterable)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
/**
......@@ -571,11 +572,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]],
w3s.asInstanceOf[Seq[W3]])
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
}
}
......@@ -589,8 +590,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Seq(vs, w1s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
......@@ -604,10 +605,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
cg.mapValues { case Array(vs, w1s, w2s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]])
}
}
......
......@@ -31,6 +31,7 @@ import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.CompactBuffer
import scala.reflect.ClassTag
......@@ -185,6 +186,7 @@ private[serializer] object KryoSerializer {
classOf[GotBlock],
classOf[GetBlock],
classOf[MapStatus],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
classOf[BoundedPriorityQueue[_]],
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util.collection
/**
* An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
* ArrayBuffer always allocates an Object array to store the data, with 16 entries by default,
* so it has about 80-100 bytes of overhead. In contrast, CompactBuffer can keep up to two
* elements in fields of the main object, and only allocates an Array[AnyRef] if there are more
* entries than that. This makes it more efficient for operations like groupBy where we expect
* some keys to have very few elements.
*/
private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
// First two elements
private var element0: T = _
private var element1: T = _
// Number of elements, including our two in the main object
private var curSize = 0
// Array for extra elements
private var otherElements: Array[AnyRef] = null
def apply(position: Int): T = {
if (position < 0 || position >= curSize) {
throw new IndexOutOfBoundsException
}
if (position == 0) {
element0
} else if (position == 1) {
element1
} else {
otherElements(position - 2).asInstanceOf[T]
}
}
private def update(position: Int, value: T): Unit = {
if (position < 0 || position >= curSize) {
throw new IndexOutOfBoundsException
}
if (position == 0) {
element0 = value
} else if (position == 1) {
element1 = value
} else {
otherElements(position - 2) = value.asInstanceOf[AnyRef]
}
}
def += (value: T): CompactBuffer[T] = {
val newIndex = curSize
if (newIndex == 0) {
element0 = value
curSize = 1
} else if (newIndex == 1) {
element1 = value
curSize = 2
} else {
growToSize(curSize + 1)
otherElements(newIndex - 2) = value.asInstanceOf[AnyRef]
}
this
}
def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
values match {
// Optimize merging of CompactBuffers, used in cogroup and groupByKey
case compactBuf: CompactBuffer[T] =>
val oldSize = curSize
// Copy the other buffer's size and elements to local variables in case it is equal to us
val itsSize = compactBuf.curSize
val itsElements = compactBuf.otherElements
growToSize(curSize + itsSize)
if (itsSize == 1) {
this(oldSize) = compactBuf.element0
} else if (itsSize == 2) {
this(oldSize) = compactBuf.element0
this(oldSize + 1) = compactBuf.element1
} else if (itsSize > 2) {
this(oldSize) = compactBuf.element0
this(oldSize + 1) = compactBuf.element1
// At this point our size is also above 2, so just copy its array directly into ours.
// Note that since we added two elements above, the index in this.otherElements that we
// should copy to is oldSize.
System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2)
}
case _ =>
values.foreach(e => this += e)
}
this
}
override def length: Int = curSize
override def size: Int = curSize
override def iterator: Iterator[T] = new Iterator[T] {
private var pos = 0
override def hasNext: Boolean = pos < curSize
override def next(): T = {
if (!hasNext) {
throw new NoSuchElementException
}
pos += 1
apply(pos - 1)
}
}
/** Increase our size to newSize and grow the backing array if needed. */
private def growToSize(newSize: Int): Unit = {
if (newSize < 0) {
throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
}
val capacity = if (otherElements != null) otherElements.length + 2 else 2
if (newSize > capacity) {
var newArrayLen = 8
while (newSize - 2 > newArrayLen) {
newArrayLen *= 2
if (newArrayLen == Int.MinValue) {
// Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
// Note that we set the new array length to Int.MaxValue - 2 so that our capacity
// calculation above still gives a positive integer.
newArrayLen = Int.MaxValue - 2
}
}
val newArray = new Array[AnyRef](newArrayLen)
if (otherElements != null) {
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
}
otherElements = newArray
}
curSize = newSize
}
}
private[spark] object CompactBuffer {
def apply[T](): CompactBuffer[T] = new CompactBuffer[T]
def apply[T](value: T): CompactBuffer[T] = {
val buf = new CompactBuffer[T]
buf += value
}
}
......@@ -156,15 +156,20 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("CoGroupedRDD") {
val longLineageRDD1 = generateFatPairRDD()
// Collect the RDD as sequences instead of arrays to enable equality tests in testRDD
val seqCollectFunc = (rdd: RDD[(Int, Array[Iterable[Int]])]) =>
rdd.map{case (p, a) => (p, a.toSeq)}.collect(): Any
testRDD(rdd => {
CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
})
}, seqCollectFunc)
val longLineageRDD2 = generateFatPairRDD()
testRDDPartitions(rdd => {
CheckpointSuite.cogroup(
longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
})
}, seqCollectFunc)
}
test("ZippedPartitionsRDD") {
......@@ -235,12 +240,19 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
assert(rdd.partitions.size === 0)
}
def defaultCollectFunc[T](rdd: RDD[T]): Any = rdd.collect()
/**
* Test checkpointing of the RDD generated by the given operation. It tests whether the
* serialized size of the RDD is reduce after checkpointing or not. This function should be called
* on all RDDs that have a parent RDD (i.e., do not call on ParallelCollection, BlockRDD, etc.).
*
* @param op an operation to run on the RDD
* @param collectFunc a function for collecting the values in the RDD, in case there are
* non-comparable types like arrays that we want to convert to something that supports ==
*/
def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U],
collectFunc: RDD[U] => Any = defaultCollectFunc[U] _) {
// Generate the final RDD using given RDD operation
val baseRDD = generateFatRDD()
val operatedRDD = op(baseRDD)
......@@ -258,13 +270,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
operatedRDD.checkpoint()
val result = operatedRDD.collect()
val result = collectFunc(operatedRDD)
operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
// Test whether the checkpoint file has been created
assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
assert(collectFunc(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get)) === result)
// Test whether dependencies have been changed from its earlier parent RDD
assert(operatedRDD.dependencies.head.rdd != parentRDD)
......@@ -279,7 +291,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
assert(operatedRDD.partitions.length === numPartitions)
// Test whether the data in the checkpointed RDD is same as original
assert(operatedRDD.collect() === result)
assert(collectFunc(operatedRDD) === result)
// Test whether serialized size of the RDD has reduced.
logInfo("Size of " + rddType +
......@@ -289,7 +301,6 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
"Size of " + rddType + " did not reduce after checkpointing " +
" [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
)
}
/**
......@@ -300,8 +311,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* This function should be called only those RDD whose partitions refer to parent RDD's
* partitions (i.e., do not call it on simple RDD like MappedRDD).
*
* @param op an operation to run on the RDD
* @param collectFunc a function for collecting the values in the RDD, in case there are
* non-comparable types like arrays that we want to convert to something that supports ==
*/
def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U],
collectFunc: RDD[U] => Any = defaultCollectFunc[U] _) {
// Generate the final RDD using given RDD operation
val baseRDD = generateFatRDD()
val operatedRDD = op(baseRDD)
......@@ -316,13 +331,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
parentRDDs.foreach(_.checkpoint()) // checkpoint the parent RDD, not the generated one
val result = operatedRDD.collect() // force checkpointing
val result = collectFunc(operatedRDD) // force checkpointing
operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
// Test whether the data in the checkpointed RDD is same as original
assert(operatedRDD.collect() === result)
assert(collectFunc(operatedRDD) === result)
// Test whether serialized size of the partitions has reduced
logInfo("Size of partitions of " + rddType +
......@@ -436,7 +451,7 @@ object CheckpointSuite {
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part
).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
).asInstanceOf[RDD[(K, Array[Iterable[V]])]]
}
}
......@@ -176,7 +176,9 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2))
.map(p => (p._1, p._2.map(_.toArray)))
.collectAsMap()
assert(results(1)(0).length === 3)
assert(results(1)(0).contains(1))
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util.collection
import org.scalatest.FunSuite
class CompactBufferSuite extends FunSuite {
test("empty buffer") {
val b = new CompactBuffer[Int]
assert(b.size === 0)
assert(b.iterator.toList === Nil)
assert(b.size === 0)
assert(b.iterator.toList === Nil)
intercept[IndexOutOfBoundsException] { b(0) }
intercept[IndexOutOfBoundsException] { b(1) }
intercept[IndexOutOfBoundsException] { b(2) }
intercept[IndexOutOfBoundsException] { b(-1) }
}
test("basic inserts") {
val b = new CompactBuffer[Int]
assert(b.size === 0)
assert(b.iterator.toList === Nil)
for (i <- 0 until 1000) {
b += i
assert(b.size === i + 1)
assert(b(i) === i)
}
assert(b.iterator.toList === (0 until 1000).toList)
assert(b.iterator.toList === (0 until 1000).toList)
assert(b.size === 1000)
}
test("adding sequences") {
val b = new CompactBuffer[Int]
assert(b.size === 0)
assert(b.iterator.toList === Nil)
// Add some simple lists and iterators
b ++= List(0)
assert(b.size === 1)
assert(b.iterator.toList === List(0))
b ++= Iterator(1)
assert(b.size === 2)
assert(b.iterator.toList === List(0, 1))
b ++= List(2)
assert(b.size === 3)
assert(b.iterator.toList === List(0, 1, 2))
b ++= Iterator(3, 4, 5, 6, 7, 8, 9)
assert(b.size === 10)
assert(b.iterator.toList === (0 until 10).toList)
// Add CompactBuffers
val b2 = new CompactBuffer[Int]
b2 ++= 0 until 10
b ++= b2
assert(b.iterator.toList === (1 to 2).flatMap(i => 0 until 10).toList)
b ++= b2
assert(b.iterator.toList === (1 to 3).flatMap(i => 0 until 10).toList)
b ++= b2
assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList)
// Add some small CompactBuffers as well
val b3 = new CompactBuffer[Int]
b ++= b3
assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList)
b3 += 0
b ++= b3
assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0))
b3 += 1
b ++= b3
assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0, 0, 1))
b3 += 2
b ++= b3
assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0, 0, 1, 0, 1, 2))
}
test("adding the same buffer to itself") {
val b = new CompactBuffer[Int]
assert(b.size === 0)
assert(b.iterator.toList === Nil)
b += 1
assert(b.toList === List(1))
for (j <- 1 until 8) {
b ++= b
assert(b.size === (1 << j))
assert(b.iterator.toList === (1 to (1 << j)).map(i => 1).toList)
}
}
}
......@@ -122,6 +122,10 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = {
val partitioner = new HashPartitioner(input.partitions.size)
val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
cogrouped.map { case (_, values: Seq[Seq[Double]]) => new DenseVector(values.flatten.toArray) }
cogrouped.map {
case (_, values: Array[Iterable[_]]) =>
val doubles = values.asInstanceOf[Array[Iterable[Double]]]
new DenseVector(doubles.flatten.toArray)
}
}
}
......@@ -133,17 +133,17 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
val mergeValues = (seqOfValues: Seq[Seq[V]]) => {
if (seqOfValues.size != 1 + numOldValues + numNewValues) {
val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {
if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
throw new Exception("Unexpected number of sequences of reduced values")
}
// Getting reduced values "old time steps" that will be removed from current window
val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head)
// Getting reduced values "new time steps"
val newValues =
(1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
(1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
if (seqOfValues(0).isEmpty) {
if (arrayOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
throw new Exception("Neither previous window has value for key, nor new values found. " +
......@@ -153,7 +153,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
newValues.reduce(reduceF) // return
} else {
// Get the previous window's reduced value
var tempValue = seqOfValues(0).head
var tempValue = arrayOfValues(0).head
// If old values exists, then inverse reduce then from previous value
if (!oldValues.isEmpty) {
tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
......@@ -166,7 +166,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
}
}
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
.mapValues(mergeValues)
if (filterFunc.isDefined) {
Some(mergedValuesRDD.filter(filterFunc.get))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment