From 4816c2ef5e04eb2dd70bed8b99882aa0b7fe7fd7 Mon Sep 17 00:00:00 2001
From: Kirby Linvill <kirby.linvill@teradata.com>
Date: Tue, 23 May 2017 12:00:58 -0700
Subject: [PATCH] [SPARK-15648][SQL] Add teradataDialect for JDBC connection to
 Teradata
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

The contribution is my original work and I license the work to the project under the project’s open source license.

Note: the Teradata JDBC connector limits the row size to 64K. The default string datatype equivalent I used is a 255 character/byte length varchar. This effectively limits the max number of string columns to 250 when using the Teradata jdbc connector.

## What changes were proposed in this pull request?

Added a teradataDialect for JDBC connection to Teradata. The Teradata dialect uses VARCHAR(255) in place of TEXT for string datatypes, and CHAR(1) in place of BIT(1) for boolean datatypes.

## How was this patch tested?

I added two unit tests to double check that the types get set correctly for a teradata jdbc url. I also ran a couple manual tests to make sure the jdbc connector worked with teradata and to make sure that an error was thrown if a row could potentially exceed 64K (this error comes from the teradata jdbc connector, not from the spark code). I did not check how string columns longer than 255 characters are handled.

Author: Kirby Linvill <kirby.linvill@teradata.com>
Author: klinvill <kjlinvill@gmail.com>

Closes #16746 from klinvill/master.
---
 .../apache/spark/sql/jdbc/JdbcDialects.scala  |  1 +
 .../spark/sql/jdbc/TeradataDialect.scala      | 34 +++++++++++++++++++
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 12 +++++++
 3 files changed, 47 insertions(+)
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index e328b86437..a86a86d408 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -174,6 +174,7 @@ object JdbcDialects {
   registerDialect(MsSqlServerDialect)
   registerDialect(DerbyDialect)
   registerDialect(OracleDialect)
+  registerDialect(TeradataDialect)
 
   /**
    * Fetch the JdbcDialect class corresponding to a given database url.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
new file mode 100644
index 0000000000..5749b791fc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.types._
+
+
+private case object TeradataDialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = { url.startsWith("jdbc:teradata") }
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    case StringType => Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR))
+    case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
+    case _ => None
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index d9f3689411..70bee929b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -922,6 +922,18 @@ class JDBCSuite extends SparkFunSuite
     assert(e2.contains("User specified schema not supported with `jdbc`"))
   }
 
+  test("SPARK-15648: teradataDialect StringType data mapping") {
+    val teradataDialect = JdbcDialects.get("jdbc:teradata://127.0.0.1/db")
+    assert(teradataDialect.getJDBCType(StringType).
+      map(_.databaseTypeDefinition).get == "VARCHAR(255)")
+  }
+
+  test("SPARK-15648: teradataDialect BooleanType data mapping") {
+    val teradataDialect = JdbcDialects.get("jdbc:teradata://127.0.0.1/db")
+    assert(teradataDialect.getJDBCType(BooleanType).
+      map(_.databaseTypeDefinition).get == "CHAR(1)")
+  }
+
   test("Checking metrics correctness with JDBC") {
     val foobarCnt = spark.table("foobar").count()
     val res = InputOutputMetricsHelper.run(sql("SELECT * FROM foobar").toDF())
-- 
GitLab