diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 009fbaa006574874de1604d51ba016e235b589f5..ba61940b3d5a4c268629172257f4715ece3810d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.execution.columnar -import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import org.apache.commons.lang.StringUtils -import org.apache.spark.{Accumulable, Accumulator} +import org.apache.spark.Accumulator import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.UserDefinedType import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.AccumulatorContext +import org.apache.spark.util.{AccumulatorContext, ListAccumulator} private[sql] object InMemoryRelation { @@ -67,14 +67,14 @@ private[sql] case class InMemoryRelation( tableName: Option[String])( @transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null, @transient private[sql] var _statistics: Statistics = null, - private[sql] var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) + private[sql] var _batchStats: ListAccumulator[InternalRow] = null) extends logical.LeafNode with MultiInstanceRelation { override def producedAttributes: AttributeSet = outputSet - private[sql] val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = + private[sql] val batchStats: ListAccumulator[InternalRow] = if (_batchStats == null) { - child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow]) + child.sqlContext.sparkContext.listAccumulator[InternalRow] } else { _batchStats } @@ -87,7 +87,7 @@ private[sql] case class InMemoryRelation( output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), partitionStatistics.schema) - batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum + batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum } // Statistics propagation contracts: @@ -169,7 +169,7 @@ private[sql] case class InMemoryRelation( val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) .flatMap(_.values)) - batchStats += stats + batchStats.add(stats) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats)