From a3c29fcbbda02c1528b4185bcb880c91077d480c Mon Sep 17 00:00:00 2001 From: "YIHAODIAN\\wangshuangshuang" <wangshuangshuang@yihaodian.com> Date: Tue, 4 Jul 2017 09:44:27 -0700 Subject: [PATCH] [SPARK-19726][SQL] Faild to insert null timestamp value to mysql using spark jdbc ## What changes were proposed in this pull request? when creating table like following: > create table timestamp_test(id int(11), time_stamp timestamp not null default current_timestamp); The result of Excuting "insert into timestamp_test values (111, null)" is different between Spark and JDBC. ``` mysql> select * from timestamp_test; +------+---------------------+ | id | time_stamp | +------+---------------------+ | 111 | 1970-01-01 00:00:00 | -> spark | 111 | 2017-06-27 19:32:38 | -> mysql +------+---------------------+ 2 rows in set (0.00 sec) ``` Because in such case ```StructField.nullable``` is false, so the generated codes of ```InvokeLike``` and ```BoundReference``` don't check whether the field is null or not. Instead, they directly use ```CodegenContext.INPUT_ROW.getLong(1)```, however, ```UnsafeRow.setNullAt(1)``` will put 0 in the underlying memory. The PR will ```always``` set ```StructField.nullable``` true after obtaining metadata from jdbc connection, Since we can insert null to not null timestamp column in MySQL. In this way, spark will propagate null to underlying DB engine, and let DB to choose how to process NULL. ## How was this patch tested? Added tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: YIHAODIAN\wangshuangshuang <wangshuangshuang@yihaodian.com> Author: Shuangshuang Wang <wsszone@gmail.com> Closes #18445 from shuangshuangwang/SPARK-19726. --- .../sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 12 ++++++++++-- .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 8 ++++++++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 0f53b5c7c6..57e9bc9b70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -59,7 +59,7 @@ object JDBCRDD extends Logging { try { val rs = statement.executeQuery() try { - JdbcUtils.getSchema(rs, dialect) + JdbcUtils.getSchema(rs, dialect, alwaysNullable = true) } finally { rs.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index ca61c2efe2..55b2539c13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -266,10 +266,14 @@ object JdbcUtils extends Logging { /** * Takes a [[ResultSet]] and returns its Catalyst schema. * + * @param alwaysNullable If true, all the columns are nullable. * @return A [[StructType]] giving the Catalyst schema. * @throws SQLException if the schema contains an unsupported type. */ - def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = { + def getSchema( + resultSet: ResultSet, + dialect: JdbcDialect, + alwaysNullable: Boolean = false): StructType = { val rsmd = resultSet.getMetaData val ncols = rsmd.getColumnCount val fields = new Array[StructField](ncols) @@ -290,7 +294,11 @@ object JdbcUtils extends Logging { rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true } } - val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls + val nullable = if (alwaysNullable) { + true + } else { + rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls + } val metadata = new MetadataBuilder() .putString("name", columnName) .putLong("scale", fieldScale) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index bf1fd16070..92f50a095f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters.propertiesAsScalaMapConverter import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} @@ -506,4 +507,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { "schema struct<name:string,id:int>")) } } + + test("SPARK-19726: INSERT null to a NOT NULL column") { + val e = intercept[SparkException] { + sql("INSERT INTO PEOPLE1 values (null, null)") + }.getMessage + assert(e.contains("NULL not allowed for column \"NAME\"")) + } } -- GitLab