diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 85d4a4a791e6b36685861fe26a84b8362f32d7c4..f7b1ec34ced767aa7b90ad0b51d2628fa96942a3 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -192,7 +192,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
     checkRow(sql("SELECT * FROM datetime1 where id = 1").head())
   }
 
-  test("SPARK-20557: column type TIMEZONE with TIME STAMP should be recognized") {
+  test("SPARK-20557: column type TIMESTAMP with TIME ZONE should be recognized") {
     val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
     val rows = dfRead.collect()
     val types = rows(0).toSeq.map(x => x.getClass.toString)
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index a1a065a443e67975dc7087493f183c225e4fa8ee..eb3c458360e7b40d5e17a986a42f84b482226ec2 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -55,6 +55,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
       + "null, null, null, null, null, "
       + "null, null, null, null, null, null, null)"
     ).executeUpdate()
+
+    conn.prepareStatement("CREATE TABLE ts_with_timezone " +
+      "(id integer, tstz TIMESTAMP WITH TIME ZONE, ttz TIME WITH TIME ZONE)")
+      .executeUpdate()
+    conn.prepareStatement("INSERT INTO ts_with_timezone VALUES " +
+      "(1, TIMESTAMP WITH TIME ZONE '2016-08-12 10:22:31.949271-07', TIME WITH TIME ZONE '17:22:31.949271+00')")
+      .executeUpdate()
   }
 
   test("Type mapping for various types") {
@@ -126,4 +133,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
     assert(schema(0).dataType == FloatType)
     assert(schema(1).dataType == ShortType)
   }
+
+  test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE should be recognized") {
+    val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
+    val rows = dfRead.collect()
+    val types = rows(0).toSeq.map(x => x.getClass.toString)
+    assert(types(1).equals("class java.sql.Timestamp"))
+    assert(types(2).equals("class java.sql.Timestamp"))
+  }
 }
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index e170133f0f0bfc1f853ad6fad871bd49431a82ad..fe4be963e8184028b09906ac39b00426c1831ce3 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -115,7 +115,7 @@
     <dependency>
       <groupId>com.h2database</groupId>
       <artifactId>h2</artifactId>
-      <version>1.4.183</version>
+      <version>1.4.195</version>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index fb877d1ca7639f94c89a0faee316dac9f30dfc7d..71eaab119d75dc3eae70099d9859aee7e12cc44d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
-import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
+import java.sql.{Connection, Driver, DriverManager, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -217,11 +217,14 @@ object JdbcUtils extends Logging {
       case java.sql.Types.OTHER         => null
       case java.sql.Types.REAL          => DoubleType
       case java.sql.Types.REF           => StringType
+      case java.sql.Types.REF_CURSOR    => null
       case java.sql.Types.ROWID         => LongType
       case java.sql.Types.SMALLINT      => IntegerType
       case java.sql.Types.SQLXML        => StringType
       case java.sql.Types.STRUCT        => StringType
       case java.sql.Types.TIME          => TimestampType
+      case java.sql.Types.TIME_WITH_TIMEZONE
+                                        => TimestampType
       case java.sql.Types.TIMESTAMP     => TimestampType
       case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
                                         => TimestampType
@@ -229,11 +232,14 @@ object JdbcUtils extends Logging {
       case java.sql.Types.TINYINT       => IntegerType
       case java.sql.Types.VARBINARY     => BinaryType
       case java.sql.Types.VARCHAR       => StringType
-      case _                            => null
+      case _                            =>
+        throw new SQLException("Unrecognized SQL type " + sqlType)
       // scalastyle:on
     }
 
-    if (answer == null) throw new SQLException("Unsupported type " + sqlType)
+    if (answer == null) {
+      throw new SQLException("Unsupported type " + JDBCType.valueOf(sqlType).getName)
+    }
     answer
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index e1049c665a4177687ef26244d31b1616fd6faab0..142b005850a49fbf605e198226fd36da6ca6d11e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.storage.StorageLevel
 
 
-
 /**
  * Internal implementation of the user-facing `Catalog`.
  */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5bd36ec25ccb0e4a559d44b27dd5f5f9c836d797..d9f3689411ab7770cacc703280b043b0f330fbae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -18,13 +18,13 @@
 package org.apache.spark.sql.jdbc
 
 import java.math.BigDecimal
-import java.sql.{Date, DriverManager, Timestamp}
+import java.sql.{Date, DriverManager, SQLException, Timestamp}
 import java.util.{Calendar, GregorianCalendar, Properties}
 
 import org.h2.jdbc.JdbcSQLException
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.DataSourceScanExec
@@ -141,6 +141,15 @@ class JDBCSuite extends SparkFunSuite
         |OPTIONS (url '$url', dbtable 'TEST.TIMETYPES', user 'testUser', password 'testPass')
        """.stripMargin.replaceAll("\n", " "))
 
+    conn.prepareStatement("CREATE TABLE test.timezone (tz TIMESTAMP WITH TIME ZONE) " +
+      "AS SELECT '1999-01-08 04:05:06.543543543 GMT-08:00'")
+      .executeUpdate()
+    conn.commit()
+
+    conn.prepareStatement("CREATE TABLE test.array (ar ARRAY) " +
+      "AS SELECT '(1, 2, 3)'")
+      .executeUpdate()
+    conn.commit()
 
     conn.prepareStatement("create table test.flttypes (a DOUBLE, b REAL, c DECIMAL(38, 18))"
         ).executeUpdate()
@@ -919,6 +928,17 @@ class JDBCSuite extends SparkFunSuite
     assert(res === (foobarCnt, 0L, foobarCnt) :: Nil)
   }
 
+  test("unsupported types") {
+    var e = intercept[SparkException] {
+      spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new Properties()).collect()
+    }.getMessage
+    assert(e.contains("java.lang.UnsupportedOperationException: unimplemented"))
+    e = intercept[SQLException] {
+      spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect()
+    }.getMessage
+    assert(e.contains("Unsupported type ARRAY"))
+  }
+
   test("SPARK-19318: Connection properties keys should be case-sensitive.") {
     def testJdbcOptions(options: JDBCOptions): Unit = {
       // Spark JDBC data source options are case-insensitive