diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala index d6b3fb3291a2e91be5b16c8cddc210087a388e25..93e82549f213b15805be075dafc3f905f5853ca9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.Partition import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.sources._ @@ -129,7 +130,8 @@ private[sql] case class JDBCRelation( parts: Array[Partition], properties: Properties = new Properties())(@transient val sqlContext: SQLContext) extends BaseRelation - with PrunedFilteredScan { + with PrunedFilteredScan + with InsertableRelation { override val needConversion: Boolean = false @@ -148,4 +150,8 @@ private[sql] case class JDBCRelation( filters, parts) } + + override def insert(data: DataFrame, overwrite: Boolean): Unit = { + data.insertIntoJDBC(url, table, overwrite, properties) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index f3ce8e66460e54aecd8f89f3613ce7e48e66a379..0800eded443de608d339ee3a2f703774e2b74d23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -43,6 +43,29 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter { conn1 = DriverManager.getConnection(url1, properties) conn1.prepareStatement("create schema test").executeUpdate() + conn1.prepareStatement("drop table if exists test.people").executeUpdate() + conn1.prepareStatement( + "create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate() + conn1.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate() + conn1.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate() + conn1.prepareStatement("drop table if exists test.people1").executeUpdate() + conn1.prepareStatement( + "create table test.people1 (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate() + conn1.commit() + + TestSQLContext.sql( + s""" + |CREATE TEMPORARY TABLE PEOPLE + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass') + """.stripMargin.replaceAll("\n", " ")) + + TestSQLContext.sql( + s""" + |CREATE TEMPORARY TABLE PEOPLE1 + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', password 'testPass') + """.stripMargin.replaceAll("\n", " ")) } after { @@ -114,5 +137,17 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter { df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true) } } - + + test("INSERT to JDBC Datasource") { + TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE") + assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count) + assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) + } + + test("INSERT to JDBC Datasource with overwrite") { + TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE") + TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE") + assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count) + assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) + } }