Skip to content
Snippets Groups Projects
Commit 64c29afc authored by sureshthalamati's avatar sureshthalamati Committed by Yin Huai
Browse files

[SPARK-9078] [SQL] Allow jdbc dialects to override the query used to check the table.

Current implementation uses query with a LIMIT clause to find if table already exists. This syntax works only in some database systems. This patch changes the default query to the one that is likely to work on most databases, and adds a new method to the  JdbcDialect abstract class to allow  dialects to override the default query.

I looked at using the JDBC meta data calls, it turns out there is no common way to find the current schema, catalog..etc.  There is a new method Connection.getSchema() , but that is available only starting jdk1.7 , and existing jdbc drivers may not have implemented it.  Other option was to use jdbc escape syntax clause for LIMIT, not sure on how well this supported in all the databases also. After looking at all the jdbc metadata options my conclusion was most common way is to use the simple select query with 'where 1 =0' , and allow dialects to customize as needed

Author: sureshthalamati <suresh.thalamati@gmail.com>

Closes #8676 from sureshthalamati/table_exists_spark-9078.
parent 35a19f33
No related branches found
No related tags found
No related merge requests found
...@@ -255,7 +255,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { ...@@ -255,7 +255,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
val conn = JdbcUtils.createConnection(url, props) val conn = JdbcUtils.createConnection(url, props)
try { try {
var tableExists = JdbcUtils.tableExists(conn, table) var tableExists = JdbcUtils.tableExists(conn, url, table)
if (mode == SaveMode.Ignore && tableExists) { if (mode == SaveMode.Ignore && tableExists) {
return return
......
...@@ -42,10 +42,13 @@ object JdbcUtils extends Logging { ...@@ -42,10 +42,13 @@ object JdbcUtils extends Logging {
/** /**
* Returns true if the table already exists in the JDBC database. * Returns true if the table already exists in the JDBC database.
*/ */
def tableExists(conn: Connection, table: String): Boolean = { def tableExists(conn: Connection, url: String, table: String): Boolean = {
val dialect = JdbcDialects.get(url)
// Somewhat hacky, but there isn't a good way to identify whether a table exists for all // Somewhat hacky, but there isn't a good way to identify whether a table exists for all
// SQL database systems, considering "table" could also include the database name. // SQL database systems using JDBC meta data calls, considering "table" could also include
Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess // the database name. Query used to find table exists can be overriden by the dialects.
Try(conn.prepareStatement(dialect.getTableExistsQuery(table)).executeQuery()).isSuccess
} }
/** /**
......
...@@ -88,6 +88,17 @@ abstract class JdbcDialect { ...@@ -88,6 +88,17 @@ abstract class JdbcDialect {
def quoteIdentifier(colName: String): String = { def quoteIdentifier(colName: String): String = {
s""""$colName"""" s""""$colName""""
} }
/**
* Get the SQL query that should be used to find if the given table exists. Dialects can
* override this method to return a query that works best in a particular database.
* @param table The name of the table.
* @return The SQL query to use for checking the table.
*/
def getTableExistsQuery(table: String): String = {
s"SELECT * FROM $table WHERE 1=0"
}
} }
/** /**
...@@ -198,6 +209,11 @@ case object PostgresDialect extends JdbcDialect { ...@@ -198,6 +209,11 @@ case object PostgresDialect extends JdbcDialect {
case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
case _ => None case _ => None
} }
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}
} }
/** /**
...@@ -222,6 +238,10 @@ case object MySQLDialect extends JdbcDialect { ...@@ -222,6 +238,10 @@ case object MySQLDialect extends JdbcDialect {
override def quoteIdentifier(colName: String): String = { override def quoteIdentifier(colName: String): String = {
s"`$colName`" s"`$colName`"
} }
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}
} }
/** /**
......
...@@ -450,4 +450,18 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext ...@@ -450,4 +450,18 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB") assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB")
assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)") assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)")
} }
test("table exists query by jdbc dialect") {
val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
val db2 = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
val h2 = JdbcDialects.get(url)
val table = "weblogs"
val defaultQuery = s"SELECT * FROM $table WHERE 1=0"
val limitQuery = s"SELECT 1 FROM $table LIMIT 1"
assert(MySQL.getTableExistsQuery(table) == limitQuery)
assert(Postgres.getTableExistsQuery(table) == limitQuery)
assert(db2.getTableExistsQuery(table) == defaultQuery)
assert(h2.getTableExistsQuery(table) == defaultQuery)
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment