Skip to content
Snippets Groups Projects
Commit 12b7191d authored by Rick Hillegas's avatar Rick Hillegas Committed by Reynold Xin
Browse files

[SPARK-10855] [SQL] Add a JDBC dialect for Apache Derby

marmbrus
rxin

This patch adds a JdbcDialect class, which customizes the datatype mappings for Derby backends. The patch also adds unit tests for the new dialect, corresponding to the existing tests for other JDBC dialects.

JDBCSuite runs cleanly for me with this patch. So does JDBCWriteSuite, although it produces noise as described here: https://issues.apache.org/jira/browse/SPARK-10890

This patch is my original work, which I license to the ASF. I am a Derby contributor, so my ICLA is on file under SVN id "rhillegas": http://people.apache.org/committer-index.html

Touches the following files:

---------------------------------

org.apache.spark.sql.jdbc.JdbcDialects

Adds a DerbyDialect.

---------------------------------

org.apache.spark.sql.jdbc.JDBCSuite

Adds unit tests for the new DerbyDialect.

Author: Rick Hillegas <rhilleg@us.ibm.com>

Closes #8982 from rick-ibm/b_10855.
parent 015f7ef5
No related branches found
No related tags found
No related merge requests found
...@@ -138,6 +138,7 @@ object JdbcDialects { ...@@ -138,6 +138,7 @@ object JdbcDialects {
registerDialect(PostgresDialect) registerDialect(PostgresDialect)
registerDialect(DB2Dialect) registerDialect(DB2Dialect)
registerDialect(MsSqlServerDialect) registerDialect(MsSqlServerDialect)
registerDialect(DerbyDialect)
/** /**
...@@ -287,3 +288,30 @@ case object MsSqlServerDialect extends JdbcDialect { ...@@ -287,3 +288,30 @@ case object MsSqlServerDialect extends JdbcDialect {
case _ => None case _ => None
} }
} }
/**
* :: DeveloperApi ::
* Default Apache Derby dialect, mapping real on read
* and string/byte/short/boolean/decimal on write.
*/
@DeveloperApi
case object DerbyDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.REAL) Option(FloatType) else None
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
case ByteType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
// 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL
case (t: DecimalType) if (t.precision > 31) =>
Some(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL))
case _ => None
}
}
...@@ -409,18 +409,22 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext ...@@ -409,18 +409,22 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect)
assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect) assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect)
assert(JdbcDialects.get("jdbc:sqlserver://127.0.0.1/db") == MsSqlServerDialect) assert(JdbcDialects.get("jdbc:sqlserver://127.0.0.1/db") == MsSqlServerDialect)
assert(JdbcDialects.get("jdbc:derby:db") == DerbyDialect)
assert(JdbcDialects.get("test.invalid") == NoopDialect) assert(JdbcDialects.get("test.invalid") == NoopDialect)
} }
test("quote column names by jdbc dialect") { test("quote column names by jdbc dialect") {
val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
val Derby = JdbcDialects.get("jdbc:derby:db")
val columns = Seq("abc", "key") val columns = Seq("abc", "key")
val MySQLColumns = columns.map(MySQL.quoteIdentifier(_)) val MySQLColumns = columns.map(MySQL.quoteIdentifier(_))
val PostgresColumns = columns.map(Postgres.quoteIdentifier(_)) val PostgresColumns = columns.map(Postgres.quoteIdentifier(_))
val DerbyColumns = columns.map(Derby.quoteIdentifier(_))
assert(MySQLColumns === Seq("`abc`", "`key`")) assert(MySQLColumns === Seq("`abc`", "`key`"))
assert(PostgresColumns === Seq(""""abc"""", """"key"""")) assert(PostgresColumns === Seq(""""abc"""", """"key""""))
assert(DerbyColumns === Seq(""""abc"""", """"key""""))
} }
test("Dialect unregister") { test("Dialect unregister") {
...@@ -454,16 +458,23 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext ...@@ -454,16 +458,23 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
test("PostgresDialect type mapping") { test("PostgresDialect type mapping") {
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
// SPARK-7869: Testing JSON types handling
assert(Postgres.getCatalystType(java.sql.Types.OTHER, "json", 1, null) === Some(StringType)) assert(Postgres.getCatalystType(java.sql.Types.OTHER, "json", 1, null) === Some(StringType))
assert(Postgres.getCatalystType(java.sql.Types.OTHER, "jsonb", 1, null) === Some(StringType)) assert(Postgres.getCatalystType(java.sql.Types.OTHER, "jsonb", 1, null) === Some(StringType))
} }
test("DerbyDialect jdbc type mapping") {
val derbyDialect = JdbcDialects.get("jdbc:derby:db")
assert(derbyDialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB")
assert(derbyDialect.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT")
assert(derbyDialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "BOOLEAN")
}
test("table exists query by jdbc dialect") { test("table exists query by jdbc dialect") {
val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
val db2 = JdbcDialects.get("jdbc:db2://127.0.0.1/db") val db2 = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
val h2 = JdbcDialects.get(url) val h2 = JdbcDialects.get(url)
val derby = JdbcDialects.get("jdbc:derby:db")
val table = "weblogs" val table = "weblogs"
val defaultQuery = s"SELECT * FROM $table WHERE 1=0" val defaultQuery = s"SELECT * FROM $table WHERE 1=0"
val limitQuery = s"SELECT 1 FROM $table LIMIT 1" val limitQuery = s"SELECT 1 FROM $table LIMIT 1"
...@@ -471,5 +482,6 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext ...@@ -471,5 +482,6 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
assert(Postgres.getTableExistsQuery(table) == limitQuery) assert(Postgres.getTableExistsQuery(table) == limitQuery)
assert(db2.getTableExistsQuery(table) == defaultQuery) assert(db2.getTableExistsQuery(table) == defaultQuery)
assert(h2.getTableExistsQuery(table) == defaultQuery) assert(h2.getTableExistsQuery(table) == defaultQuery)
assert(derby.getTableExistsQuery(table) == defaultQuery)
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment