diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index c5e1f8607b3336e26daae04793446918ca8d29ce..8c880f3ee5fa22034cb780e56bca3f4a1d66da22 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.Connection
+import java.sql.{Connection, Date, Timestamp}
 import java.util.Properties
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 import org.apache.spark.tags.DockerTest
 
 /**
@@ -77,4 +79,74 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
     // verify the value is the inserted correct or not
     assert(rows(0).getString(0).equals("foo"))
   }
+
+  test("SPARK-16625: General data types to be mapped to Oracle") {
+    val props = new Properties()
+    props.put("oracle.jdbc.mapDateToTimestamp", "false")
+
+    val schema = StructType(Seq(
+      StructField("boolean_type", BooleanType, true),
+      StructField("integer_type", IntegerType, true),
+      StructField("long_type", LongType, true),
+      StructField("float_Type", FloatType, true),
+      StructField("double_type", DoubleType, true),
+      StructField("byte_type", ByteType, true),
+      StructField("short_type", ShortType, true),
+      StructField("string_type", StringType, true),
+      StructField("binary_type", BinaryType, true),
+      StructField("date_type", DateType, true),
+      StructField("timestamp_type", TimestampType, true)
+    ))
+
+    val tableName = "test_oracle_general_types"
+    val booleanVal = true
+    val integerVal = 1
+    val longVal = 2L
+    val floatVal = 3.0f
+    val doubleVal = 4.0
+    val byteVal = 2.toByte
+    val shortVal = 5.toShort
+    val stringVal = "string"
+    val binaryVal = Array[Byte](6, 7, 8)
+    val dateVal = Date.valueOf("2016-07-26")
+    val timestampVal = Timestamp.valueOf("2016-07-26 11:49:45")
+
+    val data = spark.sparkContext.parallelize(Seq(
+      Row(
+        booleanVal, integerVal, longVal, floatVal, doubleVal, byteVal, shortVal, stringVal,
+        binaryVal, dateVal, timestampVal
+      )))
+
+    val dfWrite = spark.createDataFrame(data, schema)
+    dfWrite.write.jdbc(jdbcUrl, tableName, props)
+
+    val dfRead = spark.read.jdbc(jdbcUrl, tableName, props)
+    val rows = dfRead.collect()
+    // verify the data type is inserted
+    val types = rows(0).toSeq.map(x => x.getClass.toString)
+    assert(types(0).equals("class java.lang.Boolean"))
+    assert(types(1).equals("class java.lang.Integer"))
+    assert(types(2).equals("class java.lang.Long"))
+    assert(types(3).equals("class java.lang.Float"))
+    assert(types(4).equals("class java.lang.Float"))
+    assert(types(5).equals("class java.lang.Integer"))
+    assert(types(6).equals("class java.lang.Integer"))
+    assert(types(7).equals("class java.lang.String"))
+    assert(types(8).equals("class [B"))
+    assert(types(9).equals("class java.sql.Date"))
+    assert(types(10).equals("class java.sql.Timestamp"))
+    // verify the value is the inserted correct or not
+    val values = rows(0)
+    assert(values.getBoolean(0).equals(booleanVal))
+    assert(values.getInt(1).equals(integerVal))
+    assert(values.getLong(2).equals(longVal))
+    assert(values.getFloat(3).equals(floatVal))
+    assert(values.getFloat(4).equals(doubleVal.toFloat))
+    assert(values.getInt(5).equals(byteVal.toInt))
+    assert(values.getInt(6).equals(shortVal.toInt))
+    assert(values.getString(7).equals(stringVal))
+    assert(values.getAs[Array[Byte]](8).mkString.equals("678"))
+    assert(values.getDate(9).equals(dateVal))
+    assert(values.getTimestamp(10).equals(timestampVal))
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index ce8731efd16608dfd267b0cdbe31a9d66187f418..f541996b651e974fbfeb9c95d9ec25c98ed55be6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -28,28 +28,42 @@ private case object OracleDialect extends JdbcDialect {
 
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
-    // Handle NUMBER fields that have no precision/scale in special way
-    // because JDBC ResultSetMetaData converts this to 0 precision and -127 scale
-    // For more details, please see
-    // https://github.com/apache/spark/pull/8780#issuecomment-145598968
-    // and
-    // https://github.com/apache/spark/pull/8780#issuecomment-144541760
-    if (sqlType == Types.NUMERIC && size == 0) {
-      // This is sub-optimal as we have to pick a precision/scale in advance whereas the data
-      //  in Oracle is allowed to have different precision/scale for each value.
-      Option(DecimalType(DecimalType.MAX_PRECISION, 10))
-    } else if (sqlType == Types.NUMERIC && md.build().getLong("scale") == -127) {
-      // Handle FLOAT fields in a special way because JDBC ResultSetMetaData converts
-      // this to NUMERIC with -127 scale
-      // Not sure if there is a more robust way to identify the field as a float (or other
-      // numeric types that do not specify a scale.
-      Option(DecimalType(DecimalType.MAX_PRECISION, 10))
+    if (sqlType == Types.NUMERIC) {
+      val scale = if (null != md) md.build().getLong("scale") else 0L
+      size match {
+        // Handle NUMBER fields that have no precision/scale in special way
+        // because JDBC ResultSetMetaData converts this to 0 precision and -127 scale
+        // For more details, please see
+        // https://github.com/apache/spark/pull/8780#issuecomment-145598968
+        // and
+        // https://github.com/apache/spark/pull/8780#issuecomment-144541760
+        case 0 => Option(DecimalType(DecimalType.MAX_PRECISION, 10))
+        // Handle FLOAT fields in a special way because JDBC ResultSetMetaData converts
+        // this to NUMERIC with -127 scale
+        // Not sure if there is a more robust way to identify the field as a float (or other
+        // numeric types that do not specify a scale.
+        case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10))
+        case 1 => Option(BooleanType)
+        case 3 | 5 | 10 => Option(IntegerType)
+        case 19 if scale == 0L => Option(LongType)
+        case 19 if scale == 4L => Option(FloatType)
+        case _ => None
+      }
     } else {
       None
     }
   }
 
   override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    // For more details, please see
+    // https://docs.oracle.com/cd/E19501-01/819-3659/gcmaz/
+    case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.BOOLEAN))
+    case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.INTEGER))
+    case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.BIGINT))
+    case FloatType => Some(JdbcType("NUMBER(19, 4)", java.sql.Types.FLOAT))
+    case DoubleType => Some(JdbcType("NUMBER(19, 4)", java.sql.Types.DOUBLE))
+    case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.SMALLINT))
+    case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.SMALLINT))
     case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
     case _ => None
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 995b1200a229430ec7038c46cafa8d8df7de8edc..2d8ee338a98046e22432e05a8344271b603e1b30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -739,6 +739,27 @@ class JDBCSuite extends SparkFunSuite
       map(_.databaseTypeDefinition).get == "VARCHAR2(255)")
   }
 
+  test("SPARK-16625: General data types to be mapped to Oracle") {
+
+    def getJdbcType(dialect: JdbcDialect, dt: DataType): String = {
+      dialect.getJDBCType(dt).orElse(JdbcUtils.getCommonJDBCType(dt)).
+        map(_.databaseTypeDefinition).get
+    }
+
+    val oracleDialect = JdbcDialects.get("jdbc:oracle://127.0.0.1/db")
+    assert(getJdbcType(oracleDialect, BooleanType) == "NUMBER(1)")
+    assert(getJdbcType(oracleDialect, IntegerType) == "NUMBER(10)")
+    assert(getJdbcType(oracleDialect, LongType) == "NUMBER(19)")
+    assert(getJdbcType(oracleDialect, FloatType) == "NUMBER(19, 4)")
+    assert(getJdbcType(oracleDialect, DoubleType) == "NUMBER(19, 4)")
+    assert(getJdbcType(oracleDialect, ByteType) == "NUMBER(3)")
+    assert(getJdbcType(oracleDialect, ShortType) == "NUMBER(5)")
+    assert(getJdbcType(oracleDialect, StringType) == "VARCHAR2(255)")
+    assert(getJdbcType(oracleDialect, BinaryType) == "BLOB")
+    assert(getJdbcType(oracleDialect, DateType) == "DATE")
+    assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP")
+  }
+
   private def assertEmptyQuery(sqlString: String): Unit = {
     assert(sql(sqlString).collect().isEmpty)
   }