Skip to content
Snippets Groups Projects
Commit bad21ed0 authored by Michael Armbrust's avatar Michael Armbrust
Browse files

[SPARK-2650][SQL] Build column buffers in smaller batches

Author: Michael Armbrust <michael@databricks.com>

Closes #1880 from marmbrus/columnBatches and squashes the following commits:

0649987 [Michael Armbrust] add test
4756fad [Michael Armbrust] fix compilation
2314532 [Michael Armbrust] Build column buffers in smaller batches
parent c686b7dd
No related branches found
No related tags found
No related merge requests found
...@@ -25,6 +25,7 @@ import java.util.Properties ...@@ -25,6 +25,7 @@ import java.util.Properties
private[spark] object SQLConf { private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed" val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold" val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes" val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
...@@ -71,6 +72,9 @@ trait SQLConf { ...@@ -71,6 +72,9 @@ trait SQLConf {
/** When true tables cached using the in-memory columnar caching will be compressed. */ /** When true tables cached using the in-memory columnar caching will be compressed. */
private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean
/** The number of rows that will be */
private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
/** Number of partitions to use for shuffle operators. */ /** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
......
...@@ -273,7 +273,7 @@ class SQLContext(@transient val sparkContext: SparkContext) ...@@ -273,7 +273,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
currentTable.logicalPlan currentTable.logicalPlan
case _ => case _ =>
InMemoryRelation(useCompression, executePlan(currentTable).executedPlan) InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
} }
catalog.registerTable(None, tableName, asInMemoryRelation) catalog.registerTable(None, tableName, asInMemoryRelation)
...@@ -284,7 +284,7 @@ class SQLContext(@transient val sparkContext: SparkContext) ...@@ -284,7 +284,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
table(tableName).queryExecution.analyzed match { table(tableName).queryExecution.analyzed match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table, // This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table. // we reregister the RDD as a table.
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) => case inMem @ InMemoryRelation(_, _, _, e: ExistingRdd) =>
inMem.cachedColumnBuffers.unpersist() inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName) catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self)) catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
......
...@@ -28,13 +28,14 @@ import org.apache.spark.sql.Row ...@@ -28,13 +28,14 @@ import org.apache.spark.sql.Row
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
object InMemoryRelation { object InMemoryRelation {
def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation = def apply(useCompression: Boolean, batchSize: Int, child: SparkPlan): InMemoryRelation =
new InMemoryRelation(child.output, useCompression, child)() new InMemoryRelation(child.output, useCompression, batchSize, child)()
} }
private[sql] case class InMemoryRelation( private[sql] case class InMemoryRelation(
output: Seq[Attribute], output: Seq[Attribute],
useCompression: Boolean, useCompression: Boolean,
batchSize: Int,
child: SparkPlan) child: SparkPlan)
(private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null) (private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
extends LogicalPlan with MultiInstanceRelation { extends LogicalPlan with MultiInstanceRelation {
...@@ -43,22 +44,31 @@ private[sql] case class InMemoryRelation( ...@@ -43,22 +44,31 @@ private[sql] case class InMemoryRelation(
// As in Spark, the actual work of caching is lazy. // As in Spark, the actual work of caching is lazy.
if (_cachedColumnBuffers == null) { if (_cachedColumnBuffers == null) {
val output = child.output val output = child.output
val cached = child.execute().mapPartitions { iterator => val cached = child.execute().mapPartitions { baseIterator =>
val columnBuilders = output.map { attribute => new Iterator[Array[ByteBuffer]] {
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression) def next() = {
}.toArray val columnBuilders = output.map { attribute =>
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
var row: Row = null }.toArray
while (iterator.hasNext) {
row = iterator.next() var row: Row = null
var i = 0 var rowCount = 0
while (i < row.length) {
columnBuilders(i).appendFrom(row, i) while (baseIterator.hasNext && rowCount < batchSize) {
i += 1 row = baseIterator.next()
var i = 0
while (i < row.length) {
columnBuilders(i).appendFrom(row, i)
i += 1
}
rowCount += 1
}
columnBuilders.map(_.build())
} }
}
Iterator.single(columnBuilders.map(_.build())) def hasNext = baseIterator.hasNext
}
}.cache() }.cache()
cached.setName(child.toString) cached.setName(child.toString)
...@@ -74,6 +84,7 @@ private[sql] case class InMemoryRelation( ...@@ -74,6 +84,7 @@ private[sql] case class InMemoryRelation(
new InMemoryRelation( new InMemoryRelation(
output.map(_.newInstance), output.map(_.newInstance),
useCompression, useCompression,
batchSize,
child)( child)(
_cachedColumnBuffers).asInstanceOf[this.type] _cachedColumnBuffers).asInstanceOf[this.type]
} }
...@@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan( ...@@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan(
override def execute() = { override def execute() = {
relation.cachedColumnBuffers.mapPartitions { iterator => relation.cachedColumnBuffers.mapPartitions { iterator =>
val columnBuffers = iterator.next() // Find the ordinals of the requested columns. If none are requested, use the first.
assert(!iterator.hasNext) val requestedColumns =
if (attributes.isEmpty) {
Seq(0)
} else {
attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
}
new Iterator[Row] { new Iterator[Row] {
// Find the ordinals of the requested columns. If none are requested, use the first. private[this] var columnBuffers: Array[ByteBuffer] = null
val requestedColumns = private[this] var columnAccessors: Seq[ColumnAccessor] = null
if (attributes.isEmpty) { nextBatch()
Seq(0)
} else { private[this] val nextRow = new GenericMutableRow(columnAccessors.length)
attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
}
val columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_)) def nextBatch() = {
val nextRow = new GenericMutableRow(columnAccessors.length) columnBuffers = iterator.next()
columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
}
override def next() = { override def next() = {
if (!columnAccessors.head.hasNext) {
nextBatch()
}
var i = 0 var i = 0
while (i < nextRow.length) { while (i < nextRow.length) {
columnAccessors(i).extractTo(nextRow, i) columnAccessors(i).extractTo(nextRow, i)
...@@ -114,7 +134,7 @@ private[sql] case class InMemoryColumnarTableScan( ...@@ -114,7 +134,7 @@ private[sql] case class InMemoryColumnarTableScan(
nextRow nextRow
} }
override def hasNext = columnAccessors.head.hasNext override def hasNext = columnAccessors.head.hasNext || iterator.hasNext
} }
} }
} }
......
...@@ -22,9 +22,19 @@ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableSca ...@@ -22,9 +22,19 @@ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableSca
import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.sql.test.TestSQLContext._
case class BigData(s: String)
class CachedTableSuite extends QueryTest { class CachedTableSuite extends QueryTest {
TestData // Load test tables. TestData // Load test tables.
test("too big for memory") {
val data = "*" * 10000
sparkContext.parallelize(1 to 1000000, 1).map(_ => BigData(data)).registerTempTable("bigData")
cacheTable("bigData")
assert(table("bigData").count() === 1000000L)
uncacheTable("bigData")
}
test("SPARK-1669: cacheTable should be idempotent") { test("SPARK-1669: cacheTable should be idempotent") {
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation]) assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
...@@ -37,7 +47,7 @@ class CachedTableSuite extends QueryTest { ...@@ -37,7 +47,7 @@ class CachedTableSuite extends QueryTest {
cacheTable("testData") cacheTable("testData")
table("testData").queryExecution.analyzed match { table("testData").queryExecution.analyzed match {
case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) => case InMemoryRelation(_, _, _, _: InMemoryColumnarTableScan) =>
fail("cacheTable is not idempotent") fail("cacheTable is not idempotent")
case _ => case _ =>
......
...@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest { ...@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("simple columnar query") { test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, plan) val scan = InMemoryRelation(useCompression = true, 5, plan)
checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq)
} }
test("projection") { test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, plan) val scan = InMemoryRelation(useCompression = true, 5, plan)
checkAnswer(scan, testData.collect().map { checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key case Row(key: Int, value: String) => value -> key
...@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { ...@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, plan) val scan = InMemoryRelation(useCompression = true, 5, plan)
checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq)
......
...@@ -137,7 +137,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ...@@ -137,7 +137,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
castChildOutput(p, table, child) castChildOutput(p, table, child)
case p @ logical.InsertIntoTable( case p @ logical.InsertIntoTable(
InMemoryRelation(_, _, InMemoryRelation(_, _, _,
HiveTableScan(_, table, _)), _, child, _) => HiveTableScan(_, table, _)), _, child, _) =>
castChildOutput(p, table, child) castChildOutput(p, table, child)
} }
......
...@@ -45,7 +45,7 @@ private[hive] trait HiveStrategies { ...@@ -45,7 +45,7 @@ private[hive] trait HiveStrategies {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) => case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.InsertIntoTable( case logical.InsertIntoTable(
InMemoryRelation(_, _, InMemoryRelation(_, _, _,
HiveTableScan(_, table, _)), partition, child, overwrite) => HiveTableScan(_, table, _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil case _ => Nil
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment