diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 40f5c9fec8bb8bfd9ebd28250616e916557f657c..dacef911e397e9a8aab1aa17c585d7cfb64e702d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -116,7 +116,24 @@ object MimaExcludes {
           "org.apache.spark.rdd.MapPartitionsWithPreparationRDD$")
       ) ++ Seq(
         // SPARK-11485
-        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df")
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df"),
+        // SPARK-11541 mark various JDBC dialects as private
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productElement"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productArity"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.canEqual"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productIterator"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productPrefix"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.toString"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.hashCode"),
+        ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.PostgresDialect$"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productElement"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productArity"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.canEqual"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productIterator"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productPrefix"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.toString"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.hashCode"),
+        ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.NoopDialect$")
       )
     case v if v.startsWith("1.5") =>
       Seq(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index 7cf66b65c8722cdfcd020e3252f11fca4e1da884..f9eab5c2e965bb4595062e6633cfeb39c64973d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.NumericType
 class GroupedData protected[sql](
     df: DataFrame,
     groupingExprs: Seq[Expression],
-    private val groupType: GroupedData.GroupType) {
+    groupType: GroupedData.GroupType) {
 
   private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
     val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
new file mode 100644
index 0000000000000000000000000000000000000000..467d8d62d1b7f2729b9bb4d939b67a4a6d71673b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import org.apache.spark.sql.types.{DataType, MetadataBuilder}
+
+/**
+ * AggregatedDialect can unify multiple dialects into one virtual Dialect.
+ * Dialects are tried in order, and the first dialect that does not return a
+ * neutral element will will.
+ *
+ * @param dialects List of dialects.
+ */
+private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
+
+  require(dialects.nonEmpty)
+
+  override def canHandle(url : String): Boolean =
+    dialects.map(_.canHandle(url)).reduce(_ && _)
+
+  override def getCatalystType(
+      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+    dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption
+  }
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = {
+    dialects.flatMap(_.getJDBCType(dt)).headOption
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b1cb0e55026bee66427c37bf3949ce4ceecf0b80
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import org.apache.spark.sql.types.{BooleanType, StringType, DataType}
+
+
+private object DB2Dialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    case StringType => Option(JdbcType("CLOB", java.sql.Types.CLOB))
+    case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
+    case _ => None
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
new file mode 100644
index 0000000000000000000000000000000000000000..84f68e779c38cd701439d4b494b66a445d9d064a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.types._
+
+
+private object DerbyDialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby")
+
+  override def getCatalystType(
+      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+    if (sqlType == Types.REAL) Option(FloatType) else None
+  }
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    case StringType => Option(JdbcType("CLOB", java.sql.Types.CLOB))
+    case ByteType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
+    case ShortType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
+    case BooleanType => Option(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
+    // 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL
+    case t: DecimalType if t.precision > 31 =>
+      Option(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL))
+    case _ => None
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index f9a6a09b6270df351b9cac5e7a4811b0506bd88e..14bfea4e3e287efc03c828f7cfd4140317bdd26d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.Types
-
 import org.apache.spark.sql.types._
 import org.apache.spark.annotation.DeveloperApi
 
@@ -115,11 +113,10 @@ abstract class JdbcDialect {
 @DeveloperApi
 object JdbcDialects {
 
-  private var dialects = List[JdbcDialect]()
-
   /**
    * Register a dialect for use on all new matching jdbc [[org.apache.spark.sql.DataFrame]].
    * Readding an existing dialect will cause a move-to-front.
+   *
    * @param dialect The new dialect.
    */
   def registerDialect(dialect: JdbcDialect) : Unit = {
@@ -128,12 +125,15 @@ object JdbcDialects {
 
   /**
    * Unregister a dialect. Does nothing if the dialect is not registered.
+   *
    * @param dialect The jdbc dialect.
    */
   def unregisterDialect(dialect : JdbcDialect) : Unit = {
     dialects = dialects.filterNot(_ == dialect)
   }
 
+  private[this] var dialects = List[JdbcDialect]()
+
   registerDialect(MySQLDialect)
   registerDialect(PostgresDialect)
   registerDialect(DB2Dialect)
@@ -141,7 +141,6 @@ object JdbcDialects {
   registerDialect(DerbyDialect)
   registerDialect(OracleDialect)
 
-
   /**
    * Fetch the JdbcDialect class corresponding to a given database url.
    */
@@ -156,187 +155,8 @@ object JdbcDialects {
 }
 
 /**
- * :: DeveloperApi ::
- * AggregatedDialect can unify multiple dialects into one virtual Dialect.
- * Dialects are tried in order, and the first dialect that does not return a
- * neutral element will will.
- * @param dialects List of dialects.
- */
-@DeveloperApi
-class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
-
-  require(dialects.nonEmpty)
-
-  override def canHandle(url : String): Boolean =
-    dialects.map(_.canHandle(url)).reduce(_ && _)
-
-  override def getCatalystType(
-      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
-    dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption
-  }
-
-  override def getJDBCType(dt: DataType): Option[JdbcType] = {
-    dialects.flatMap(_.getJDBCType(dt)).headOption
-  }
-}
-
-/**
- * :: DeveloperApi ::
  * NOOP dialect object, always returning the neutral element.
  */
-@DeveloperApi
-case object NoopDialect extends JdbcDialect {
+private object NoopDialect extends JdbcDialect {
   override def canHandle(url : String): Boolean = true
 }
-
-/**
- * :: DeveloperApi ::
- * Default postgres dialect, mapping bit/cidr/inet on read and string/binary/boolean on write.
- */
-@DeveloperApi
-case object PostgresDialect extends JdbcDialect {
-  override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
-  override def getCatalystType(
-      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
-    if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
-      Option(BinaryType)
-    } else if (sqlType == Types.OTHER && typeName.equals("cidr")) {
-      Option(StringType)
-    } else if (sqlType == Types.OTHER && typeName.equals("inet")) {
-      Option(StringType)
-    } else if (sqlType == Types.OTHER && typeName.equals("json")) {
-      Option(StringType)
-    } else if (sqlType == Types.OTHER && typeName.equals("jsonb")) {
-      Option(StringType)
-    } else None
-  }
-
-  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
-    case StringType => Some(JdbcType("TEXT", java.sql.Types.CHAR))
-    case BinaryType => Some(JdbcType("BYTEA", java.sql.Types.BINARY))
-    case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
-    case _ => None
-  }
-
-  override def getTableExistsQuery(table: String): String = {
-    s"SELECT 1 FROM $table LIMIT 1"
-  }
-
-}
-
-/**
- * :: DeveloperApi ::
- * Default mysql dialect to read bit/bitsets correctly.
- */
-@DeveloperApi
-case object MySQLDialect extends JdbcDialect {
-  override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
-  override def getCatalystType(
-      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
-    if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
-      // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
-      // byte arrays instead of longs.
-      md.putLong("binarylong", 1)
-      Option(LongType)
-    } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
-      Option(BooleanType)
-    } else None
-  }
-
-  override def quoteIdentifier(colName: String): String = {
-    s"`$colName`"
-  }
-
-  override def getTableExistsQuery(table: String): String = {
-    s"SELECT 1 FROM $table LIMIT 1"
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * Default DB2 dialect, mapping string/boolean on write to valid DB2 types.
- * By default string, and boolean gets mapped to db2 invalid types TEXT, and BIT(1).
- */
-@DeveloperApi
-case object DB2Dialect extends JdbcDialect {
-
-  override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")
-
-  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
-    case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
-    case BooleanType => Some(JdbcType("CHAR(1)", java.sql.Types.CHAR))
-    case _ => None
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * Default Microsoft SQL Server dialect, mapping the datetimeoffset types to a String on read.
- */
-@DeveloperApi
-case object MsSqlServerDialect extends JdbcDialect {
-  override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")
-  override def getCatalystType(
-      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
-    if (typeName.contains("datetimeoffset")) {
-      // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
-      Option(StringType)
-    } else None
-  }
-
-  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
-    case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
-    case _ => None
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * Default Apache Derby dialect, mapping real on read
- * and string/byte/short/boolean/decimal on write.
- */
-@DeveloperApi
-case object DerbyDialect extends JdbcDialect {
-  override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby")
-  override def getCatalystType(
-      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
-    if (sqlType == Types.REAL) Option(FloatType) else None
-  }
-
-  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
-    case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
-    case ByteType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
-    case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
-    case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
-    // 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL
-    case (t: DecimalType) if (t.precision > 31) =>
-      Some(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL))
-    case _ => None
-  }
-
-}
-
-/**
- * :: DeveloperApi ::
- * Default Oracle dialect, mapping a nonspecific numeric type to a general decimal type.
- */
-@DeveloperApi
-case object OracleDialect extends JdbcDialect {
-  override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
-  override def getCatalystType(
-      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
-    // Handle NUMBER fields that have no precision/scale in special way
-    // because JDBC ResultSetMetaData converts this to 0 procision and -127 scale
-    // For more details, please see
-    // https://github.com/apache/spark/pull/8780#issuecomment-145598968
-    // and
-    // https://github.com/apache/spark/pull/8780#issuecomment-144541760
-    if (sqlType == Types.NUMERIC && size == 0) {
-      // This is sub-optimal as we have to pick a precision/scale in advance whereas the data
-      //  in Oracle is allowed to have different precision/scale for each value.
-      Some(DecimalType(DecimalType.MAX_PRECISION, 10))
-    } else {
-      None
-    }
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
new file mode 100644
index 0000000000000000000000000000000000000000..3eb722b070d5d1cce0d5de118d3bccefa49a1cdb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import org.apache.spark.sql.types._
+
+
+private object MsSqlServerDialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")
+
+  override def getCatalystType(
+      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+    if (typeName.contains("datetimeoffset")) {
+      // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
+      Option(StringType)
+    } else {
+      None
+    }
+  }
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
+    case _ => None
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
new file mode 100644
index 0000000000000000000000000000000000000000..da413ed1f08b54261eb6b9088b28cffc3047c2f0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.types.{BooleanType, LongType, DataType, MetadataBuilder}
+
+
+private case object MySQLDialect extends JdbcDialect {
+
+  override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
+
+  override def getCatalystType(
+      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+    if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
+      // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
+      // byte arrays instead of longs.
+      md.putLong("binarylong", 1)
+      Option(LongType)
+    } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
+      Option(BooleanType)
+    } else None
+  }
+
+  override def quoteIdentifier(colName: String): String = {
+    s"`$colName`"
+  }
+
+  override def getTableExistsQuery(table: String): String = {
+    s"SELECT 1 FROM $table LIMIT 1"
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4165c382689f9044bcd699efb0174eb4dcf195a0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.types._
+
+
+private case object OracleDialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
+
+  override def getCatalystType(
+      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+    // Handle NUMBER fields that have no precision/scale in special way
+    // because JDBC ResultSetMetaData converts this to 0 procision and -127 scale
+    // For more details, please see
+    // https://github.com/apache/spark/pull/8780#issuecomment-145598968
+    // and
+    // https://github.com/apache/spark/pull/8780#issuecomment-144541760
+    if (sqlType == Types.NUMERIC && size == 0) {
+      // This is sub-optimal as we have to pick a precision/scale in advance whereas the data
+      //  in Oracle is allowed to have different precision/scale for each value.
+      Option(DecimalType(DecimalType.MAX_PRECISION, 10))
+    } else {
+      None
+    }
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
new file mode 100644
index 0000000000000000000000000000000000000000..e701a7fcd9e1664fe3fb4318c4002cf476dfae3a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.types._
+
+
+private object PostgresDialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
+
+  override def getCatalystType(
+      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+    if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
+      Option(BinaryType)
+    } else if (sqlType == Types.OTHER && typeName.equals("cidr")) {
+      Option(StringType)
+    } else if (sqlType == Types.OTHER && typeName.equals("inet")) {
+      Option(StringType)
+    } else if (sqlType == Types.OTHER && typeName.equals("json")) {
+      Option(StringType)
+    } else if (sqlType == Types.OTHER && typeName.equals("jsonb")) {
+      Option(StringType)
+    } else None
+  }
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    case StringType => Some(JdbcType("TEXT", java.sql.Types.CHAR))
+    case BinaryType => Some(JdbcType("BYTEA", java.sql.Types.BINARY))
+    case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
+    case _ => None
+  }
+
+  override def getTableExistsQuery(table: String): String = {
+    s"SELECT 1 FROM $table LIMIT 1"
+  }
+}