Skip to content
Snippets Groups Projects
Commit 59236e5c authored by Michael Armbrust's avatar Michael Armbrust Committed by Shixiong Zhu
Browse files

[SPARK-14288][SQL] Memory Sink for streaming

This PR exposes the internal testing `MemorySink` though the data source API.  This will allow users to easily test streaming applications in the Spark shell or other local tests.

Usage:
```scala
inputStream.write
  .format("memory")
  .queryName("memStream")
  .startStream()

// Now you can query the result of the stream here.
sqlContext.table("memStream")
```

The most complicated part of the logic is choosing the checkpoint directory.  There are a few requirements we are attempting to satisfy here:
 - when working in the shell locally, it should just work with no extra configuration.
 - when working on a cluster you should be able to make it easily create the checkpoint on a distributed file system so you can test aggregation (state checkpoints are also stored in this directory and must be accessible from workers).
 - it should be clear that you can't resume since the data is just in memory.

The chosen algorithm proceeds as follows:
 - the user gives a checkpoint directory, use it
 - if the conf has a checkpoint location, use `$location/$queryName`
 - if neither, create a local directory
 - always check to make sure there are no offsets written to the directory

Author: Michael Armbrust <michael@databricks.com>

Closes #12119 from marmbrus/memorySink.
parent 5e64dab8
No related branches found
No related tags found
No related merge requests found
......@@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.HadoopFsRelation
import org.apache.spark.util.Utils
/**
* :: Experimental ::
......@@ -275,23 +277,64 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 2.0.0
*/
def startStream(): ContinuousQuery = {
val dataSource =
DataSource(
df.sqlContext,
className = source,
options = extraOptions.toMap,
partitionColumns = normalizedParCols.getOrElse(Nil))
val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
})
df.sqlContext.sessionState.continuousQueryManager.startQuery(
queryName,
checkpointLocation,
df,
dataSource.createSink(),
trigger)
if (source == "memory") {
val queryName =
extraOptions.getOrElse(
"queryName", throw new AnalysisException("queryName must be specified for memory sink"))
val checkpointLocation = extraOptions.get("checkpointLocation").map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
val checkpointConfig: Option[String] =
df.sqlContext.conf.getConf(
SQLConf.CHECKPOINT_LOCATION,
None)
checkpointConfig.map { location =>
new Path(location, queryName).toUri.toString
}
}.getOrElse {
Utils.createTempDir(namePrefix = "memory.stream").getCanonicalPath
}
// If offsets have already been created, we trying to resume a query.
val checkpointPath = new Path(checkpointLocation, "offsets")
val fs = checkpointPath.getFileSystem(df.sqlContext.sparkContext.hadoopConfiguration)
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.")
} else {
checkpointPath.toUri.toString
}
val sink = new MemorySink(df.schema)
val resultDf = Dataset.ofRows(df.sqlContext, new MemoryPlan(sink))
resultDf.registerTempTable(queryName)
val continuousQuery = df.sqlContext.sessionState.continuousQueryManager.startQuery(
queryName,
checkpointLocation,
df,
sink,
trigger)
continuousQuery
} else {
val dataSource =
DataSource(
df.sqlContext,
className = source,
options = extraOptions.toMap,
partitionColumns = normalizedParCols.getOrElse(Nil))
val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
})
df.sqlContext.sessionState.continuousQueryManager.startQuery(
queryName,
checkpointLocation,
df,
dataSource.createSink(),
trigger)
}
}
/**
......
......@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
......@@ -30,6 +31,7 @@ import org.apache.spark.sql.execution.command.{DescribeCommand => RunnableDescri
import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, _}
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming.MemoryPlan
import org.apache.spark.sql.internal.SQLConf
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
......@@ -332,6 +334,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
LocalTableScan(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil
case logical.Distinct(child) =>
throw new IllegalStateException(
"logical distinct operator should have been replaced by aggregate in the optimizer")
......
......@@ -25,6 +25,8 @@ import scala.util.control.NonFatal
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext}
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.types.StructType
object MemoryStream {
......@@ -136,3 +138,9 @@ class MemorySink(val schema: StructType) extends Sink with Logging {
}
}
/**
* Used to query the data that has been written into a [[MemorySink]].
*/
case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode {
def this(sink: MemorySink) = this(sink, sink.schema.toAttributes)
}
......@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.streaming.MemoryPlan
abstract class QueryTest extends PlanTest {
......@@ -200,6 +201,7 @@ abstract class QueryTest extends PlanTest {
logicalPlan.transform {
case _: ObjectOperator => return
case _: LogicalRelation => return
case _: MemoryPlan => return
}.transformAllExpressions {
case a: ImperativeAggregate => return
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.streaming
import org.apache.spark.sql.{AnalysisException, Row, StreamTest}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
class MemorySinkSuite extends StreamTest with SharedSQLContext {
import testImplicits._
test("registering as a table") {
val input = MemoryStream[Int]
val query = input.toDF().write
.format("memory")
.queryName("memStream")
.startStream()
input.addData(1, 2, 3)
query.processAllAvailable()
checkDataset(
sqlContext.table("memStream").as[Int],
1, 2, 3)
input.addData(4, 5, 6)
query.processAllAvailable()
checkDataset(
sqlContext.table("memStream").as[Int],
1, 2, 3, 4, 5, 6)
query.stop()
}
test("error when no name is specified") {
val error = intercept[AnalysisException] {
val input = MemoryStream[Int]
val query = input.toDF().write
.format("memory")
.startStream()
}
assert(error.message contains "queryName must be specified")
}
test("error if attempting to resume specific checkpoint") {
val location = Utils.createTempDir("steaming.checkpoint").getCanonicalPath
val input = MemoryStream[Int]
val query = input.toDF().write
.format("memory")
.queryName("memStream")
.option("checkpointLocation", location)
.startStream()
input.addData(1, 2, 3)
query.processAllAvailable()
query.stop()
intercept[AnalysisException] {
input.toDF().write
.format("memory")
.queryName("memStream")
.option("checkpointLocation", location)
.startStream()
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment