diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 3025660301663bf6a35ad8d3e3d9842bc55228bc..4e6dcaa8f48a2f07211d43cfb6d9ef6abf46c5f1 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Row => SparkRow}
 import org.apache.spark.sql.execution.command.SetCommand
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveUtils}
+import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{Utils => SparkUtils}
@@ -60,7 +60,7 @@ private[hive] class SparkExecuteStatementOperation(
     } else {
       logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
       val schema = result.queryExecution.analyzed.output.map { attr =>
-        new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
+        new FieldSchema(attr.name, attr.dataType.catalogString, "")
       }
       new TableSchema(schema.asJava)
     }
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index f730952507a78415588e86fdd03f33d827c50506..1fa885177e862ccd6f46099fd899cadc31a42d3e 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -29,10 +29,9 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.hive.HiveContext
 
-private[hive] class SparkSQLDriver(
-    val context: HiveContext = SparkSQLEnv.hiveContext)
+private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
   extends Driver
   with Logging {
 
@@ -49,7 +48,7 @@ private[hive] class SparkSQLDriver(
       new Schema(Arrays.asList(new FieldSchema("Response code", "string", "")), null)
     } else {
       val fieldSchemas = analyzed.output.map { attr =>
-        new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
+        new FieldSchema(attr.name, attr.dataType.catalogString, "")
       }
 
       new Schema(fieldSchemas.asJava, null)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6ccff454b16a5b518a7d056564f71176d607c0d7..9353e9ccd22626b53e1006573bfb1a0614c00fd0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -270,7 +269,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
           serdeProperties = options
         ),
         schema = relation.schema.map { f =>
-          CatalogColumn(f.name, HiveMetastoreTypes.toMetastoreType(f.dataType))
+          CatalogColumn(f.name, f.dataType.catalogString)
         },
         properties = tableProperties.toMap,
         viewText = None) // TODO: We need to place the SQL string here
@@ -637,7 +636,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
           table.schema
         } else {
           child.output.map { a =>
-            CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType), a.nullable)
+            CatalogColumn(a.name, a.dataType.catalogString, a.nullable)
           }
         }
 
@@ -770,35 +769,3 @@ private[hive] case class InsertIntoHiveTable(
     case (childAttr, tableAttr) => childAttr.dataType.sameType(tableAttr.dataType)
   }
 }
-
-
-private[hive] object HiveMetastoreTypes {
-  def toDataType(metastoreType: String): DataType = DataTypeParser.parse(metastoreType)
-
-  def decimalMetastoreString(decimalType: DecimalType): String = decimalType match {
-    case DecimalType.Fixed(precision, scale) => s"decimal($precision,$scale)"
-    case _ => s"decimal($HiveShim.UNLIMITED_DECIMAL_PRECISION,$HiveShim.UNLIMITED_DECIMAL_SCALE)"
-  }
-
-  def toMetastoreType(dt: DataType): String = dt match {
-    case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>"
-    case StructType(fields) =>
-      s"struct<${fields.map(f => s"${f.name}:${toMetastoreType(f.dataType)}").mkString(",")}>"
-    case MapType(keyType, valueType, _) =>
-      s"map<${toMetastoreType(keyType)},${toMetastoreType(valueType)}>"
-    case StringType => "string"
-    case FloatType => "float"
-    case IntegerType => "int"
-    case ByteType => "tinyint"
-    case ShortType => "smallint"
-    case DoubleType => "double"
-    case LongType => "bigint"
-    case BinaryType => "binary"
-    case BooleanType => "boolean"
-    case DateType => "date"
-    case d: DecimalType => decimalMetastoreString(d)
-    case TimestampType => "timestamp"
-    case NullType => "void"
-    case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType)
-  }
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index ceb7f3b890949146b556e08816f8fa6a50cf9231..f4e26fab6f2e7919164711090ab6a703c10dd599 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -18,11 +18,10 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation}
+import org.apache.spark.sql.hive.MetastoreRelation
 
 /**
  * Create table and insert the query result into it.
@@ -62,7 +61,7 @@ case class CreateTableAsSelect(
         // Hive doesn't support specifying the column list for target table in CTAS
         // However we don't think SparkSQL should follow that.
         tableDesc.copy(schema = query.output.map { c =>
-          CatalogColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType))
+          CatalogColumn(c.name, c.dataType.catalogString)
         })
       } else {
         withFormat
@@ -85,7 +84,8 @@ case class CreateTableAsSelect(
         throw new AnalysisException(s"$tableIdentifier already exists.")
       }
     } else {
-      sqlContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
+      sqlContext.executePlan(InsertIntoTable(
+        metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
     }
 
     Seq.empty[Row]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 8248a112a0af4025c85b4c9334f4a6c85036db75..da7b73ae64289abfadbad2fd98944cb57acae092 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.hive.HiveMetastoreTypes
+import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.types.StructType
 
 private[orc] object OrcFileOperator extends Logging {
@@ -78,7 +78,7 @@ private[orc] object OrcFileOperator extends Logging {
       val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
       val schema = readerInspector.getTypeName
       logDebug(s"Reading schema from file $paths, got Hive schema string: $schema")
-      HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
+      DataTypeParser.parse(schema).asInstanceOf[StructType]
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index b0f32faa5c8e8a489cc437cead863064f0e83423..4250a873412b0ec7e618817b0a05bc31d93590f8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
 import org.apache.spark.sql.sources.{Filter, _}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
@@ -186,9 +186,7 @@ private[orc] class OrcOutputWriter(
   private val serializer = {
     val table = new Properties()
     table.setProperty("columns", dataSchema.fieldNames.mkString(","))
-    table.setProperty("columns.types", dataSchema.map { f =>
-      HiveMetastoreTypes.toMetastoreType(f.dataType)
-    }.mkString(":"))
+    table.setProperty("columns.types", dataSchema.map(_.dataType.catalogString).mkString(":"))
 
     val serde = new OrcSerde
     val configuration = context.getConfiguration
@@ -198,10 +196,7 @@ private[orc] class OrcOutputWriter(
 
   // Object inspector converted from the schema of the relation to be written.
   private val structOI = {
-    val typeInfo =
-      TypeInfoUtils.getTypeInfoFromTypeString(
-        HiveMetastoreTypes.toMetastoreType(dataSchema))
-
+    val typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString)
     OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
       .asInstanceOf[SettableStructObjectInspector]
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 2a201c195f167dc91a6dd2c01996f961f3cbdbad..d1a1490f666d7b294803db5ec373b8ce30ee8877 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.spark.sql.{QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
@@ -32,14 +33,14 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton {
 
   test("struct field should accept underscore in sub-column name") {
     val hiveTypeStr = "struct<a: int, b_1: string, c: string>"
-    val dateType = HiveMetastoreTypes.toDataType(hiveTypeStr)
+    val dateType = DataTypeParser.parse(hiveTypeStr)
     assert(dateType.isInstanceOf[StructType])
   }
 
   test("udt to metastore type conversion") {
     val udt = new ExamplePointUDT
-    assertResult(HiveMetastoreTypes.toMetastoreType(udt.sqlType)) {
-      HiveMetastoreTypes.toMetastoreType(udt)
+    assertResult(udt.sqlType.catalogString) {
+      udt.catalogString
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index dc87daae7248e4f3120f7c4b6f7645a3fb86c1e0..11165a7ebbe8c3ee4083ffb5805c7bbe3092b3a8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -918,7 +919,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       // As a proxy for verifying that the table was stored in Hive compatible format,
       // we verify that each column of the table is of native type StringType.
       assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema
-        .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType))
+        .forall(column => DataTypeParser.parse(column.dataType) == StringType))
 
       sessionState.catalog.createDataSourceTable(
         name = TableIdentifier("skip_hive_metadata"),
@@ -932,7 +933,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       // As a proxy for verifying that the table was stored in SparkSQL format,
       // we verify that the table has a column type as array of StringType.
       assert(sharedState.externalCatalog.getTable("default", "skip_hive_metadata")
-        .schema.forall { c => HiveMetastoreTypes.toDataType(c.dataType) == ArrayType(StringType) })
+        .schema.forall { c => DataTypeParser.parse(c.dataType) == ArrayType(StringType) })
     }
   }
 }