Skip to content
Snippets Groups Projects
Commit 733b81b8 authored by Bill Chambers's avatar Bill Chambers Committed by Burak Yavuz
Browse files

[SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans

## What changes were proposed in this pull request?

We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka.

## How was this patch tested?

New unit test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Bill Chambers <bill@databricks.com>

Closes #17804 from anabranch/SPARK-20496-2.
parent 8c911ada
No related branches found
No related tags found
No related merge requests found
......@@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging {
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.logical.output
val schema = queryExecution.analyzed.output
schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
if (topic == None) {
throw new AnalysisException(s"topic option required when no " +
......@@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging {
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.logical.output
val schema = queryExecution.analyzed.output
validateQuery(queryExecution, kafkaParameters, topic)
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
queryExecution.toRdd.foreachPartition { iter =>
......
......@@ -28,6 +28,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{BinaryType, DataType}
......@@ -108,6 +109,21 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
s"save mode overwrite not allowed for kafka"))
}
test("SPARK-20496: batch - enforce analyzed plans") {
val inputEvents =
spark.range(1, 1000)
.select(to_json(struct("*")) as 'value)
val topic = newTopic()
testUtils.createTopic(topic)
// used to throw UnresolvedException
inputEvents.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.save()
}
test("streaming - write to kafka with topic field") {
val input = MemoryStream[String]
val topic = newTopic()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment