Skip to content
Snippets Groups Projects
Commit a3c29fcb authored by YIHAODIAN\wangshuangshuang's avatar YIHAODIAN\wangshuangshuang Committed by gatorsmile
Browse files

[SPARK-19726][SQL] Faild to insert null timestamp value to mysql using spark jdbc

## What changes were proposed in this pull request?

when creating table like following:
> create table timestamp_test(id int(11), time_stamp timestamp not null default current_timestamp);

The result of Excuting "insert into timestamp_test values (111, null)" is different between Spark and JDBC.
```
mysql> select * from timestamp_test;
+------+---------------------+
| id   | time_stamp          |
+------+---------------------+
|  111 | 1970-01-01 00:00:00 | -> spark
|  111 | 2017-06-27 19:32:38 | -> mysql
+------+---------------------+
2 rows in set (0.00 sec)
```
   Because in such case ```StructField.nullable``` is false, so the generated codes of ```InvokeLike``` and ```BoundReference``` don't check whether the field is null or not. Instead, they directly use ```CodegenContext.INPUT_ROW.getLong(1)```, however, ```UnsafeRow.setNullAt(1)``` will put 0 in the underlying memory.

   The PR will ```always``` set ```StructField.nullable```  true after obtaining metadata from jdbc connection, Since we can insert null to not null timestamp column in MySQL. In this way, spark will propagate null to underlying DB engine, and let DB to choose how to process NULL.

## How was this patch tested?

Added tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: YIHAODIAN\wangshuangshuang <wangshuangshuang@yihaodian.com>
Author: Shuangshuang Wang <wsszone@gmail.com>

Closes #18445 from shuangshuangwang/SPARK-19726.
parent 29b1f6b0
No related branches found
No related tags found
No related merge requests found
...@@ -59,7 +59,7 @@ object JDBCRDD extends Logging { ...@@ -59,7 +59,7 @@ object JDBCRDD extends Logging {
try { try {
val rs = statement.executeQuery() val rs = statement.executeQuery()
try { try {
JdbcUtils.getSchema(rs, dialect) JdbcUtils.getSchema(rs, dialect, alwaysNullable = true)
} finally { } finally {
rs.close() rs.close()
} }
......
...@@ -266,10 +266,14 @@ object JdbcUtils extends Logging { ...@@ -266,10 +266,14 @@ object JdbcUtils extends Logging {
/** /**
* Takes a [[ResultSet]] and returns its Catalyst schema. * Takes a [[ResultSet]] and returns its Catalyst schema.
* *
* @param alwaysNullable If true, all the columns are nullable.
* @return A [[StructType]] giving the Catalyst schema. * @return A [[StructType]] giving the Catalyst schema.
* @throws SQLException if the schema contains an unsupported type. * @throws SQLException if the schema contains an unsupported type.
*/ */
def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = { def getSchema(
resultSet: ResultSet,
dialect: JdbcDialect,
alwaysNullable: Boolean = false): StructType = {
val rsmd = resultSet.getMetaData val rsmd = resultSet.getMetaData
val ncols = rsmd.getColumnCount val ncols = rsmd.getColumnCount
val fields = new Array[StructField](ncols) val fields = new Array[StructField](ncols)
...@@ -290,7 +294,11 @@ object JdbcUtils extends Logging { ...@@ -290,7 +294,11 @@ object JdbcUtils extends Logging {
rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true
} }
} }
val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls val nullable = if (alwaysNullable) {
true
} else {
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
}
val metadata = new MetadataBuilder() val metadata = new MetadataBuilder()
.putString("name", columnName) .putString("name", columnName)
.putLong("scale", fieldScale) .putLong("scale", fieldScale)
......
...@@ -24,6 +24,7 @@ import scala.collection.JavaConverters.propertiesAsScalaMapConverter ...@@ -24,6 +24,7 @@ import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
...@@ -506,4 +507,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { ...@@ -506,4 +507,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
"schema struct<name:string,id:int>")) "schema struct<name:string,id:int>"))
} }
} }
test("SPARK-19726: INSERT null to a NOT NULL column") {
val e = intercept[SparkException] {
sql("INSERT INTO PEOPLE1 values (null, null)")
}.getMessage
assert(e.contains("NULL not allowed for column \"NAME\""))
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment