Skip to content
Snippets Groups Projects
Commit 7a00c658 authored by hyukjinkwon's avatar hyukjinkwon Committed by Shixiong Zhu
Browse files

[SPARK-21147][SS] Throws an analysis exception when a user-specified schema is...

[SPARK-21147][SS] Throws an analysis exception when a user-specified schema is given in socket/rate sources

## What changes were proposed in this pull request?

This PR proposes to throw an exception if a schema is provided by user to socket source as below:

**socket source**

```scala
import org.apache.spark.sql.types._

val userSpecifiedSchema = StructType(
  StructField("name", StringType) ::
  StructField("area", StringType) :: Nil)
val df = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).schema(userSpecifiedSchema).load
df.printSchema
```

Before

```
root
 |-- value: string (nullable = true)
```

After

```
org.apache.spark.sql.AnalysisException: The socket source does not support a user-specified schema.;
  at org.apache.spark.sql.execution.streaming.TextSocketSourceProvider.sourceSchema(socket.scala:199)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
  at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
  ... 50 elided
```

**rate source**

```scala
spark.readStream.format("rate").schema(spark.range(1).schema).load().printSchema()
```

Before

```
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)`
```

After

```
org.apache.spark.sql.AnalysisException: The rate source does not support a user-specified schema.;
  at org.apache.spark.sql.execution.streaming.RateSourceProvider.sourceSchema(RateSourceProvider.scala:57)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
  at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
  ... 48 elided
```

## How was this patch tested?

Unit test in `TextSocketStreamSuite` and `RateSourceSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18365 from HyukjinKwon/SPARK-21147.
parent ad459cfb
No related branches found
No related tags found
No related merge requests found
...@@ -25,7 +25,7 @@ import org.apache.commons.io.IOUtils ...@@ -25,7 +25,7 @@ import org.apache.commons.io.IOUtils
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
...@@ -52,8 +52,13 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister { ...@@ -52,8 +52,13 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister {
sqlContext: SQLContext, sqlContext: SQLContext,
schema: Option[StructType], schema: Option[StructType],
providerName: String, providerName: String,
parameters: Map[String, String]): (String, StructType) = parameters: Map[String, String]): (String, StructType) = {
if (schema.nonEmpty) {
throw new AnalysisException("The rate source does not support a user-specified schema.")
}
(shortName(), RateSourceProvider.SCHEMA) (shortName(), RateSourceProvider.SCHEMA)
}
override def createSource( override def createSource(
sqlContext: SQLContext, sqlContext: SQLContext,
......
...@@ -195,13 +195,17 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis ...@@ -195,13 +195,17 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
if (!parameters.contains("port")) { if (!parameters.contains("port")) {
throw new AnalysisException("Set a port to read from with option(\"port\", ...).") throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
} }
val schema = if (schema.nonEmpty) {
throw new AnalysisException("The socket source does not support a user-specified schema.")
}
val sourceSchema =
if (parseIncludeTimestamp(parameters)) { if (parseIncludeTimestamp(parameters)) {
TextSocketSource.SCHEMA_TIMESTAMP TextSocketSource.SCHEMA_TIMESTAMP
} else { } else {
TextSocketSource.SCHEMA_REGULAR TextSocketSource.SCHEMA_REGULAR
} }
("textSocket", schema) ("textSocket", sourceSchema)
} }
override def createSource( override def createSource(
......
...@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming ...@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
import org.apache.spark.util.ManualClock import org.apache.spark.util.ManualClock
...@@ -179,4 +180,15 @@ class RateSourceSuite extends StreamTest { ...@@ -179,4 +180,15 @@ class RateSourceSuite extends StreamTest {
testIllegalOptionValue("rowsPerSecond", "-1", Seq("-1", "rowsPerSecond", "positive")) testIllegalOptionValue("rowsPerSecond", "-1", Seq("-1", "rowsPerSecond", "positive"))
testIllegalOptionValue("numPartitions", "-1", Seq("-1", "numPartitions", "positive")) testIllegalOptionValue("numPartitions", "-1", Seq("-1", "numPartitions", "positive"))
} }
test("user-specified schema given") {
val exception = intercept[AnalysisException] {
spark.readStream
.format("rate")
.schema(spark.range(1).schema)
.load()
}
assert(exception.getMessage.contains(
"rate source does not support a user-specified schema"))
}
} }
...@@ -148,6 +148,21 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before ...@@ -148,6 +148,21 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
} }
} }
test("user-specified schema given") {
val provider = new TextSocketSourceProvider
val userSpecifiedSchema = StructType(
StructField("name", StringType) ::
StructField("area", StringType) :: Nil)
val exception = intercept[AnalysisException] {
provider.sourceSchema(
sqlContext, Some(userSpecifiedSchema),
"",
Map("host" -> "localhost", "port" -> "1234"))
}
assert(exception.getMessage.contains(
"socket source does not support a user-specified schema"))
}
test("no server up") { test("no server up") {
val provider = new TextSocketSourceProvider val provider = new TextSocketSourceProvider
val parameters = Map("host" -> "localhost", "port" -> "0") val parameters = Map("host" -> "localhost", "port" -> "0")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment