diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 0f53b5c7c6f0fbccf259cf32526814c581cf6235..57e9bc9b70454a6ca803c984aa904b0b0a325b73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -59,7 +59,7 @@ object JDBCRDD extends Logging { try { val rs = statement.executeQuery() try { - JdbcUtils.getSchema(rs, dialect) + JdbcUtils.getSchema(rs, dialect, alwaysNullable = true) } finally { rs.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index ca61c2efe2ddfcebabb1fac7656298b71532ba29..55b2539c13381c9e8d563ca06e3c03ced6eeaa56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -266,10 +266,14 @@ object JdbcUtils extends Logging { /** * Takes a [[ResultSet]] and returns its Catalyst schema. * + * @param alwaysNullable If true, all the columns are nullable. * @return A [[StructType]] giving the Catalyst schema. * @throws SQLException if the schema contains an unsupported type. */ - def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = { + def getSchema( + resultSet: ResultSet, + dialect: JdbcDialect, + alwaysNullable: Boolean = false): StructType = { val rsmd = resultSet.getMetaData val ncols = rsmd.getColumnCount val fields = new Array[StructField](ncols) @@ -290,7 +294,11 @@ object JdbcUtils extends Logging { rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true } } - val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls + val nullable = if (alwaysNullable) { + true + } else { + rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls + } val metadata = new MetadataBuilder() .putString("name", columnName) .putLong("scale", fieldScale) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index bf1fd160704fa5c0e71c519bba4036d13351544f..92f50a095f19bc56d30254c3b6f2de6034dcc5da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters.propertiesAsScalaMapConverter import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} @@ -506,4 +507,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { "schema struct<name:string,id:int>")) } } + + test("SPARK-19726: INSERT null to a NOT NULL column") { + val e = intercept[SparkException] { + sql("INSERT INTO PEOPLE1 values (null, null)") + }.getMessage + assert(e.contains("NULL not allowed for column \"NAME\"")) + } }