Skip to content
Snippets Groups Projects
Commit 26c1089c authored by Josh Rosen's avatar Josh Rosen Committed by Reynold Xin
Browse files

[SPARK-15748][SQL] Replace inefficient foldLeft() call with flatMap() in PartitionStatistics

`PartitionStatistics` uses `foldLeft` and list concatenation (`++`) to flatten an iterator of lists, but this is extremely inefficient compared to simply doing `flatMap`/`flatten` because it performs many unnecessary object allocations. Simply replacing this `foldLeft` by a `flatMap` results in decent performance gains when constructing PartitionStatistics instances for tables with many columns.

This patch fixes this and also makes two similar changes in MLlib and streaming to try to fix all known occurrences of this pattern.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #13491 from JoshRosen/foldleft-to-flatmap.
parent 30c4774f
No related branches found
No related tags found
No related merge requests found
......@@ -422,7 +422,7 @@ private[ml] object MetaAlgorithmReadWrite {
case rformModel: RFormulaModel => Array(rformModel.pipelineModel)
case _: Params => Array()
}
val subStageMaps = subStages.map(getUidMapImpl).foldLeft(List.empty[(String, Params)])(_ ++ _)
val subStageMaps = subStages.flatMap(getUidMapImpl)
List((instance.uid, instance)) ++ subStageMaps
}
}
......@@ -33,9 +33,9 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializabl
}
private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
val (forAttribute, schema) = {
val (forAttribute: AttributeMap[ColumnStatisticsSchema], schema: Seq[AttributeReference]) = {
val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a))
(AttributeMap(allStats), allStats.map(_._2.schema).foldLeft(Seq.empty[Attribute])(_ ++ _))
(AttributeMap(allStats), allStats.flatMap(_._2.schema))
}
}
......
......@@ -396,11 +396,11 @@ private[ui] class StreamingPage(parent: StreamingTab)
.map(_.ceil.toLong)
.getOrElse(0L)
val content = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map {
val content: Seq[Node] = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).flatMap {
case (streamId, recordRates) =>
generateInputDStreamRow(
jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated)
}.foldLeft[Seq[Node]](Nil)(_ ++ _)
}
// scalastyle:off
<table class="table table-bordered" style="width: auto">
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment