diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index b2d9b8d2a012f1c7a8e768490d480f6dd6ea2d7d..2f33f2e4ff8d1968302abcb32a730eaa29f16070 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -99,7 +99,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
   }
 
   private def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
-    // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+    // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
     // Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
     // the file name is fine and won't overflow.
     val split = taskContext.getTaskAttemptID.getTaskID.getId
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index b3ef29f6e34c4481ec29795deac8df427826ba84..dcd9003ec66f5619aa056753fc984eeba29862c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -108,21 +108,18 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
 
 
   /**
-   * Returns the result as a hive compatible sequence of strings.  For native commands, the
-   * execution is simply passed back to Hive.
+   * Returns the result as a hive compatible sequence of strings. This is for testing only.
    */
   def hiveResultString(): Seq[String] = executedPlan match {
     case ExecutedCommandExec(desc: DescribeTableCommand) =>
-      SQLExecution.withNewExecutionId(sparkSession, this) {
-        // If it is a describe command for a Hive table, we want to have the output format
-        // be similar with Hive.
-        desc.run(sparkSession).map {
-          case Row(name: String, dataType: String, comment) =>
-            Seq(name, dataType,
-              Option(comment.asInstanceOf[String]).getOrElse(""))
-              .map(s => String.format(s"%-20s", s))
-              .mkString("\t")
-        }
+      // If it is a describe command for a Hive table, we want to have the output format
+      // be similar with Hive.
+      desc.run(sparkSession).map {
+        case Row(name: String, dataType: String, comment) =>
+          Seq(name, dataType,
+            Option(comment.asInstanceOf[String]).getOrElse(""))
+            .map(s => String.format(s"%-20s", s))
+            .mkString("\t")
       }
     // SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
     case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
@@ -130,13 +127,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
     case command: ExecutedCommandExec =>
       command.executeCollect().map(_.getString(0))
     case other =>
-      SQLExecution.withNewExecutionId(sparkSession, this) {
-        val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
-        // We need the types so we can output struct field names
-        val types = analyzed.output.map(_.dataType)
-        // Reformat to match hive tab delimited output.
-        result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")).toSeq
-      }
+      val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
+      // We need the types so we can output struct field names
+      val types = analyzed.output.map(_.dataType)
+      // Reformat to match hive tab delimited output.
+      result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))
   }
 
   /** Formats a datum (based on the given data type) and returns the string representation. */
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 9b4b8b6fcd910bbcf292e4cf41a5352fdbe94950..4e30d038b198572d9f82c4f424360543bd312c67 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -66,6 +66,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::
         new DetermineHiveSerde(conf) ::
+        new HiveAnalysis(sparkSession) ::
         new ResolveDataSource(sparkSession) :: Nil
 
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
@@ -88,7 +89,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
           SpecialLimits,
           InMemoryScans,
           HiveTableScans,
-          DataSinks,
           Scripts,
           Aggregation,
           JoinSelection,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d1f11e78b4ea2f40e716e59cfe5a425804973aa3..7987a0a84c728271c7362a9bf64c99fd7721777d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -21,14 +21,14 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -86,6 +86,47 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
   }
 }
 
+class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists)
+        if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, partSpec, query) =>
+      InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+
+    case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
+      // Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde
+      // tables yet.
+      if (mode == SaveMode.Append) {
+        throw new AnalysisException(
+          "CTAS for hive serde tables does not support append semantics.")
+      }
+
+      val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
+      CreateHiveTableAsSelectCommand(
+        tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
+        query,
+        mode == SaveMode.Ignore)
+  }
+
+  /**
+   * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule
+   * [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to
+   * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and
+   * fix the schema mismatch by adding Cast.
+   */
+  private def hasBeenPreprocessed(
+      tableOutput: Seq[Attribute],
+      partSchema: StructType,
+      partSpec: Map[String, Option[String]],
+      query: LogicalPlan): Boolean = {
+    val partColNames = partSchema.map(_.name).toSet
+    query.resolved && partSpec.keys.forall(partColNames.contains) && {
+      val staticPartCols = partSpec.filter(_._2.isDefined).keySet
+      val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name))
+      expectedColumns.toStructType.sameType(query.schema)
+    }
+  }
+}
+
 private[hive] trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
   self: SparkPlanner =>
@@ -94,35 +135,9 @@ private[hive] trait HiveStrategies {
 
   object Scripts extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case logical.ScriptTransformation(input, script, output, child, ioschema) =>
+      case ScriptTransformation(input, script, output, child, ioschema) =>
         val hiveIoSchema = HiveScriptIOSchema(ioschema)
-        ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil
-      case _ => Nil
-    }
-  }
-
-  object DataSinks extends Strategy {
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case logical.InsertIntoTable(
-          table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
-        InsertIntoHiveTable(
-          table, partition, planLater(child), overwrite, ifNotExists) :: Nil
-
-      case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
-        // Currently `DataFrameWriter.saveAsTable` doesn't support
-        // the Append mode of hive serde tables yet.
-        if (mode == SaveMode.Append) {
-          throw new AnalysisException(
-            "CTAS for hive serde tables does not support append semantics.")
-        }
-
-        val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-        val cmd = CreateHiveTableAsSelectCommand(
-          tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
-          query,
-          mode == SaveMode.Ignore)
-        ExecutedCommandExec(cmd) :: Nil
-
+        ScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil
       case _ => Nil
     }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index aaf30f41f29c2dd6e1edff20d62e74da20aef318..b4b63032ab261c8f1bcd4aade3203f3b718d68da 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -311,10 +311,10 @@ private[hive] object HiveTableUtil {
   // that calls Hive.get() which tries to access metastore, but it's not valid in runtime
   // it would be fixed in next version of hive but till then, we should use this instead
   def configureJobPropertiesForStorageHandler(
-      tableDesc: TableDesc, jobConf: JobConf, input: Boolean) {
+      tableDesc: TableDesc, conf: Configuration, input: Boolean) {
     val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE)
     val storageHandler =
-      org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property)
+      org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(conf, property)
     if (storageHandler != null) {
       val jobProperties = new java.util.LinkedHashMap[String, String]
       if (input) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index b1b8439efa011e15369f02bc6e86901c444d2459..4e2193b6abc3f4afc1b6b707af84d13a1a8c4207 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
 
 /** Support for interacting with different versions of the HiveMetastoreClient */
 package object client {
-  private[client] abstract class HiveVersion(
+  private[hive] abstract class HiveVersion(
       val fullVersion: String,
       val extraDeps: Seq[String] = Nil,
       val exclusions: Seq[String] = Nil)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
new file mode 100644
index 0000000000000000000000000000000000000000..cc2b60bc419634731c43780c503c0fd709f71445
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat with Logging {
+  override def inferSchema(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    throw new UnsupportedOperationException(s"inferSchema is not supported for hive data source.")
+  }
+
+  override def prepareWrite(
+      sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    val conf = job.getConfiguration
+    val tableDesc = fileSinkConf.getTableInfo
+    conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName)
+
+    // When speculation is on and output committer class name contains "Direct", we should warn
+    // users that they may loss data if they are using a direct output committer.
+    val speculationEnabled = sparkSession.sparkContext.conf.getBoolean("spark.speculation", false)
+    val outputCommitterClass = conf.get("mapred.output.committer.class", "")
+    if (speculationEnabled && outputCommitterClass.contains("Direct")) {
+      val warningMessage =
+        s"$outputCommitterClass may be an output committer that writes data directly to " +
+          "the final location. Because speculation is enabled, this output committer may " +
+          "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
+          "committer that does not have this behavior (e.g. FileOutputCommitter)."
+      logWarning(warningMessage)
+    }
+
+    // Add table properties from storage handler to hadoopConf, so any custom storage
+    // handler settings can be set to hadoopConf
+    HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false)
+    Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
+
+    // Avoid referencing the outer object.
+    val fileSinkConfSer = fileSinkConf
+    new OutputWriterFactory {
+      private val jobConf = new SerializableJobConf(new JobConf(conf))
+      @transient private lazy val outputFormat =
+        jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        Utilities.getFileExtension(jobConf.value, fileSinkConfSer.getCompressed, outputFormat)
+      }
+
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, dataSchema)
+      }
+    }
+  }
+}
+
+class HiveOutputWriter(
+    path: String,
+    fileSinkConf: FileSinkDesc,
+    jobConf: JobConf,
+    dataSchema: StructType) extends OutputWriter with HiveInspectors {
+
+  private def tableDesc = fileSinkConf.getTableInfo
+
+  private val serializer = {
+    val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
+    serializer.initialize(null, tableDesc.getProperties)
+    serializer
+  }
+
+  private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
+    jobConf,
+    tableDesc,
+    serializer.getSerializedClass,
+    fileSinkConf,
+    new Path(path),
+    Reporter.NULL)
+
+  private val standardOI = ObjectInspectorUtils
+    .getStandardObjectInspector(
+      tableDesc.getDeserializer.getObjectInspector,
+      ObjectInspectorCopyOption.JAVA)
+    .asInstanceOf[StructObjectInspector]
+
+  private val fieldOIs =
+    standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray
+  private val dataTypes = dataSchema.map(_.dataType).toArray
+  private val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) }
+  private val outputData = new Array[Any](fieldOIs.length)
+
+  override def write(row: InternalRow): Unit = {
+    var i = 0
+    while (i < fieldOIs.length) {
+      outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
+      i += 1
+    }
+    hiveWriter.write(serializer.serialize(outputData, standardOI))
+  }
+
+  override def close(): Unit = {
+    // Seems the boolean value passed into close does not matter.
+    hiveWriter.close(false)
+  }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index aa858e808edf789317a18a055cf463b51f7fa123..ce418ae135dd9d09b4d45f373174ce92b7ebc0ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -24,22 +24,22 @@ import java.util.{Date, Locale, Random}
 
 import scala.util.control.NonFatal
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
 import org.apache.hadoop.hive.ql.ErrorMsg
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.datasources.FileFormatWriter
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
+import org.apache.spark.sql.hive.client.HiveVersion
 import org.apache.spark.SparkException
-import org.apache.spark.util.SerializableJobConf
 
 
 /**
@@ -69,26 +69,20 @@ import org.apache.spark.util.SerializableJobConf
  *                  {{{
  *                  Map('a' -> Some('1'), 'b' -> None)
  *                  }}}.
- * @param child the logical plan representing data to write to.
+ * @param query the logical plan representing data to write to.
  * @param overwrite overwrite existing table or partitions.
  * @param ifNotExists If true, only write if the table or partition does not exist.
  */
 case class InsertIntoHiveTable(
     table: MetastoreRelation,
     partition: Map[String, Option[String]],
-    child: SparkPlan,
+    query: LogicalPlan,
     overwrite: Boolean,
-    ifNotExists: Boolean) extends UnaryExecNode {
+    ifNotExists: Boolean) extends RunnableCommand {
 
-  @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
-  @transient private val externalCatalog = sqlContext.sharedState.externalCatalog
+  override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
 
-  def output: Seq[Attribute] = Seq.empty
-
-  val hadoopConf = sessionState.newHadoopConf()
   var createdTempDir: Option[Path] = None
-  val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
-  val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
 
   private def executionId: String = {
     val rand: Random = new Random
@@ -96,7 +90,10 @@ case class InsertIntoHiveTable(
     "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
   }
 
-  private def getStagingDir(inputPath: Path): Path = {
+  private def getStagingDir(
+      inputPath: Path,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
     val inputPathUri: URI = inputPath.toUri
     val inputPathName: String = inputPathUri.getPath
     val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -121,17 +118,27 @@ case class InsertIntoHiveTable(
         throw new RuntimeException(
           "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
     }
-    return dir
+    dir
   }
 
-  private def getExternalScratchDir(extURI: URI): Path = {
-    getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
+  private def getExternalScratchDir(
+      extURI: URI,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
+    getStagingDir(
+      new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
+      hadoopConf,
+      stagingDir)
   }
 
-  def getExternalTmpPath(path: Path): Path = {
+  def getExternalTmpPath(
+      path: Path,
+      hiveVersion: HiveVersion,
+      hadoopConf: Configuration,
+      stagingDir: String,
+      scratchDir: String): Path = {
     import org.apache.spark.sql.hive.client.hive._
 
-    val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
     // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
     // a common scratch directory. After the writing is finished, Hive will simply empty the table
     // directory and move the staging directory to it.
@@ -142,16 +149,19 @@ case class InsertIntoHiveTable(
     // staging directory under the table director for Hive prior to 1.1, the staging directory will
     // be removed by Hive when Hive is trying to empty the table directory.
     if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
-      oldVersionExternalTempPath(path)
+      oldVersionExternalTempPath(path, hadoopConf, scratchDir)
     } else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
-      newVersionExternalTempPath(path)
+      newVersionExternalTempPath(path, hadoopConf, stagingDir)
     } else {
       throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
     }
   }
 
   // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  def oldVersionExternalTempPath(path: Path): Path = {
+  def oldVersionExternalTempPath(
+      path: Path,
+      hadoopConf: Configuration,
+      scratchDir: String): Path = {
     val extURI: URI = path.toUri
     val scratchPath = new Path(scratchDir, executionId)
     var dirPath = new Path(
@@ -176,54 +186,44 @@ case class InsertIntoHiveTable(
   }
 
   // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
-  def newVersionExternalTempPath(path: Path): Path = {
+  def newVersionExternalTempPath(
+      path: Path,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
     val extURI: URI = path.toUri
     if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path.getParent)
+      getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir)
     } else {
-      new Path(getExternalScratchDir(extURI), "-ext-10000")
+      new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
     }
   }
 
-  def getExtTmpPathRelTo(path: Path): Path = {
-    new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
-  }
-
-  private def saveAsHiveFile(
-      rdd: RDD[InternalRow],
-      valueClass: Class[_],
-      fileSinkConf: FileSinkDesc,
-      conf: SerializableJobConf,
-      writerContainer: SparkHiveWriterContainer): Unit = {
-    assert(valueClass != null, "Output value class not set")
-    conf.value.setOutputValueClass(valueClass)
-
-    val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
-    assert(outputFileFormatClassName != null, "Output format class not set")
-    conf.value.set("mapred.output.format.class", outputFileFormatClassName)
-
-    FileOutputFormat.setOutputPath(
-      conf.value,
-      SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value))
-    log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
-    writerContainer.driverSideSetup()
-    sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
-    writerContainer.commitJob()
+  def getExtTmpPathRelTo(
+      path: Path,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
+    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
   }
 
   /**
    * Inserts all the rows in the table into Hive.  Row objects are properly serialized with the
    * `org.apache.hadoop.hive.serde2.SerDe` and the
    * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
-   *
-   * Note: this is run once and then kept to avoid double insertions.
    */
-  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val sessionState = sparkSession.sessionState
+    val externalCatalog = sparkSession.sharedState.externalCatalog
+    val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
+    val hadoopConf = sessionState.newHadoopConf()
+    val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+    val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
+
     // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
     // instances within the closure, since Serializer is not serializable while TableDesc is.
     val tableDesc = table.tableDesc
     val tableLocation = table.hiveQlTable.getDataLocation
-    val tmpLocation = getExternalTmpPath(tableLocation)
+    val tmpLocation =
+      getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir)
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
     val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
 
@@ -276,40 +276,31 @@ case class InsertIntoHiveTable(
       }
     }
 
-    val jobConf = new JobConf(hadoopConf)
-    val jobConfSer = new SerializableJobConf(jobConf)
-
-    // When speculation is on and output committer class name contains "Direct", we should warn
-    // users that they may loss data if they are using a direct output committer.
-    val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
-    val outputCommitterClass = jobConf.get("mapred.output.committer.class", "")
-    if (speculationEnabled && outputCommitterClass.contains("Direct")) {
-      val warningMessage =
-        s"$outputCommitterClass may be an output committer that writes data directly to " +
-          "the final location. Because speculation is enabled, this output committer may " +
-          "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
-          "committer that does not have this behavior (e.g. FileOutputCommitter)."
-      logWarning(warningMessage)
+    val committer = FileCommitProtocol.instantiate(
+      sparkSession.sessionState.conf.fileCommitProtocolClass,
+      jobId = java.util.UUID.randomUUID().toString,
+      outputPath = tmpLocation.toString,
+      isAppend = false)
+
+    val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
+      query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
+        throw new AnalysisException(
+          s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
+      }.asInstanceOf[Attribute]
     }
 
-    val writerContainer = if (numDynamicPartitions > 0) {
-      val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
-      new SparkHiveDynamicPartitionWriterContainer(
-        jobConf,
-        fileSinkConf,
-        dynamicPartColNames,
-        child.output)
-    } else {
-      new SparkHiveWriterContainer(
-        jobConf,
-        fileSinkConf,
-        child.output)
-    }
-
-    @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
-    saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
+    FileFormatWriter.write(
+      sparkSession = sparkSession,
+      queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
+      fileFormat = new HiveFileFormat(fileSinkConf),
+      committer = committer,
+      outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty),
+      hadoopConf = hadoopConf,
+      partitionColumns = partitionAttributes,
+      bucketSpec = None,
+      refreshFunction = _ => (),
+      options = Map.empty)
 
-    val outputPath = FileOutputFormat.getOutputPath(jobConf)
     // TODO: Correctly set holdDDLTime.
     // In most of the time, we should have holdDDLTime = false.
     // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
@@ -319,7 +310,7 @@ case class InsertIntoHiveTable(
         externalCatalog.loadDynamicPartitions(
           db = table.catalogTable.database,
           table = table.catalogTable.identifier.table,
-          outputPath.toString,
+          tmpLocation.toString,
           partitionSpec,
           overwrite,
           numDynamicPartitions,
@@ -363,7 +354,7 @@ case class InsertIntoHiveTable(
           externalCatalog.loadPartition(
             table.catalogTable.database,
             table.catalogTable.identifier.table,
-            outputPath.toString,
+            tmpLocation.toString,
             partitionSpec,
             isOverwrite = doHiveOverwrite,
             holdDDLTime = holdDDLTime,
@@ -375,7 +366,7 @@ case class InsertIntoHiveTable(
       externalCatalog.loadTable(
         table.catalogTable.database,
         table.catalogTable.identifier.table,
-        outputPath.toString, // TODO: URI
+        tmpLocation.toString, // TODO: URI
         overwrite,
         holdDDLTime,
         isSrcLocal = false)
@@ -391,21 +382,13 @@ case class InsertIntoHiveTable(
     }
 
     // Invalidate the cache.
-    sqlContext.sharedState.cacheManager.invalidateCache(table)
-    sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
+    sparkSession.sharedState.cacheManager.invalidateCache(table)
+    sparkSession.sessionState.catalog.refreshTable(table.catalogTable.identifier)
 
     // It would be nice to just return the childRdd unchanged so insert operations could be chained,
     // however for now we return an empty list to simplify compatibility checks with hive, which
     // does not return anything for insert operations.
     // TODO: implement hive compatibility as rules.
-    Seq.empty[InternalRow]
-  }
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1)
+    Seq.empty[Row]
   }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
similarity index 99%
rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
index 50855e48bc8fea6231e7e906d82bddf538d77363..e7c165c5f86c5ab49ea16aaf768f104031469d9a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
@@ -52,7 +52,7 @@ import org.apache.spark.util.{CircularBuffer, RedirectThread, SerializableConfig
  * @param script the command that should be executed.
  * @param output the attributes that are produced by the script.
  */
-case class ScriptTransformation(
+case class ScriptTransformationExec(
     input: Seq[Expression],
     script: String,
     output: Seq[Attribute],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
deleted file mode 100644
index 0c9321068c4c1cc2561bd552b9a7294174b3af37..0000000000000000000000000000000000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.text.NumberFormat
-import java.util.{Date, Locale}
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.common.FileUtils
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
-import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.ql.plan.TableDesc
-import org.apache.hadoop.hive.serde2.Serializer
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.mapreduce.TaskType
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.SparkHadoopWriterUtils
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.UnsafeKVExternalSorter
-import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableJobConf
-import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
-
-/**
- * Internal helper class that saves an RDD using a Hive OutputFormat.
- * It is based on `SparkHadoopWriter`.
- */
-private[hive] class SparkHiveWriterContainer(
-    @transient private val jobConf: JobConf,
-    fileSinkConf: FileSinkDesc,
-    inputSchema: Seq[Attribute])
-  extends Logging
-  with HiveInspectors
-  with Serializable {
-
-  private val now = new Date()
-  private val tableDesc: TableDesc = fileSinkConf.getTableInfo
-  // Add table properties from storage handler to jobConf, so any custom storage
-  // handler settings can be set to jobConf
-  if (tableDesc != null) {
-    HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, jobConf, false)
-    Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
-  }
-  protected val conf = new SerializableJobConf(jobConf)
-
-  private var jobID = 0
-  private var splitID = 0
-  private var attemptID = 0
-  private var jID: SerializableWritable[JobID] = null
-  private var taID: SerializableWritable[TaskAttemptID] = null
-
-  @transient private var writer: FileSinkOperator.RecordWriter = null
-  @transient protected lazy val committer = conf.value.getOutputCommitter
-  @transient protected lazy val jobContext = new JobContextImpl(conf.value, jID.value)
-  @transient private lazy val taskContext = new TaskAttemptContextImpl(conf.value, taID.value)
-  @transient private lazy val outputFormat =
-    conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
-
-  def driverSideSetup() {
-    setIDs(0, 0, 0)
-    setConfParams()
-    committer.setupJob(jobContext)
-  }
-
-  def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
-    setIDs(jobId, splitId, attemptId)
-    setConfParams()
-    committer.setupTask(taskContext)
-    initWriters()
-  }
-
-  protected def getOutputName: String = {
-    val numberFormat = NumberFormat.getInstance(Locale.US)
-    numberFormat.setMinimumIntegerDigits(5)
-    numberFormat.setGroupingUsed(false)
-    val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
-    "part-" + numberFormat.format(splitID) + extension
-  }
-
-  def close() {
-    // Seems the boolean value passed into close does not matter.
-    if (writer != null) {
-      writer.close(false)
-      commit()
-    }
-  }
-
-  def commitJob() {
-    committer.commitJob(jobContext)
-  }
-
-  protected def initWriters() {
-    // NOTE this method is executed at the executor side.
-    // For Hive tables without partitions or with only static partitions, only 1 writer is needed.
-    writer = HiveFileFormatUtils.getHiveRecordWriter(
-      conf.value,
-      fileSinkConf.getTableInfo,
-      conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
-      fileSinkConf,
-      FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
-      Reporter.NULL)
-  }
-
-  protected def commit() {
-    SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
-  }
-
-  def abortTask(): Unit = {
-    if (committer != null) {
-      committer.abortTask(taskContext)
-    }
-    logError(s"Task attempt $taskContext aborted.")
-  }
-
-  private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
-    jobID = jobId
-    splitID = splitId
-    attemptID = attemptId
-
-    jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobId))
-    taID = new SerializableWritable[TaskAttemptID](
-      new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
-  }
-
-  private def setConfParams() {
-    conf.value.set("mapred.job.id", jID.value.toString)
-    conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
-    conf.value.set("mapred.task.id", taID.value.toString)
-    conf.value.setBoolean("mapred.task.is.map", true)
-    conf.value.setInt("mapred.task.partition", splitID)
-  }
-
-  def newSerializer(tableDesc: TableDesc): Serializer = {
-    val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
-    serializer.initialize(null, tableDesc.getProperties)
-    serializer
-  }
-
-  protected def prepareForWrite() = {
-    val serializer = newSerializer(fileSinkConf.getTableInfo)
-    val standardOI = ObjectInspectorUtils
-      .getStandardObjectInspector(
-        fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
-        ObjectInspectorCopyOption.JAVA)
-      .asInstanceOf[StructObjectInspector]
-
-    val fieldOIs = standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray
-    val dataTypes = inputSchema.map(_.dataType)
-    val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) }
-    val outputData = new Array[Any](fieldOIs.length)
-    (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData)
-  }
-
-  // this function is executed on executor side
-  def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
-    val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite()
-    executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
-
-    iterator.foreach { row =>
-      var i = 0
-      while (i < fieldOIs.length) {
-        outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
-        i += 1
-      }
-      writer.write(serializer.serialize(outputData, standardOI))
-    }
-
-    close()
-  }
-}
-
-private[hive] object SparkHiveWriterContainer {
-  def createPathFromString(path: String, conf: JobConf): Path = {
-    if (path == null) {
-      throw new IllegalArgumentException("Output path is null")
-    }
-    val outputPath = new Path(path)
-    val fs = outputPath.getFileSystem(conf)
-    if (outputPath == null || fs == null) {
-      throw new IllegalArgumentException("Incorrectly formatted output path")
-    }
-    outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-  }
-}
-
-private[spark] object SparkHiveDynamicPartitionWriterContainer {
-  val SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs"
-}
-
-private[spark] class SparkHiveDynamicPartitionWriterContainer(
-    jobConf: JobConf,
-    fileSinkConf: FileSinkDesc,
-    dynamicPartColNames: Array[String],
-    inputSchema: Seq[Attribute])
-  extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema) {
-
-  import SparkHiveDynamicPartitionWriterContainer._
-
-  private val defaultPartName = jobConf.get(
-    ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultStrVal)
-
-  override protected def initWriters(): Unit = {
-    // do nothing
-  }
-
-  override def close(): Unit = {
-    // do nothing
-  }
-
-  override def commitJob(): Unit = {
-    // This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4),
-    // semantics of FileSystem.globStatus() is different from higher versions (e.g. 2.4.1) and will
-    // include _SUCCESS file when glob'ing for dynamic partition data files.
-    //
-    // Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
-    // calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
-    // load it with loadDynamicPartitions/loadPartition/loadTable.
-    val oldMarker = conf.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
-    conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
-    super.commitJob()
-    conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
-  }
-
-  // this function is executed on executor side
-  override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
-    val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite()
-    executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
-
-    val partitionOutput = inputSchema.takeRight(dynamicPartColNames.length)
-    val dataOutput = inputSchema.take(fieldOIs.length)
-    // Returns the partition key given an input row
-    val getPartitionKey = UnsafeProjection.create(partitionOutput, inputSchema)
-    // Returns the data columns to be written given an input row
-    val getOutputRow = UnsafeProjection.create(dataOutput, inputSchema)
-
-    val fun: AnyRef = (pathString: String) => FileUtils.escapePathName(pathString, defaultPartName)
-    // Expressions that given a partition key build a string like: col1=val/col2=val/...
-    val partitionStringExpression = partitionOutput.zipWithIndex.flatMap { case (c, i) =>
-      val escaped =
-        ScalaUDF(fun, StringType, Seq(Cast(c, StringType)), Seq(StringType))
-      val str = If(IsNull(c), Literal(defaultPartName), escaped)
-      val partitionName = Literal(dynamicPartColNames(i) + "=") :: str :: Nil
-      if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
-    }
-
-    // Returns the partition path given a partition key.
-    val getPartitionString =
-      UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionOutput)
-
-    // If anything below fails, we should abort the task.
-    try {
-      val sorter: UnsafeKVExternalSorter = new UnsafeKVExternalSorter(
-        StructType.fromAttributes(partitionOutput),
-        StructType.fromAttributes(dataOutput),
-        SparkEnv.get.blockManager,
-        SparkEnv.get.serializerManager,
-        TaskContext.get().taskMemoryManager().pageSizeBytes,
-        SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
-          UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
-
-      while (iterator.hasNext) {
-        val inputRow = iterator.next()
-        val currentKey = getPartitionKey(inputRow)
-        sorter.insertKV(currentKey, getOutputRow(inputRow))
-      }
-
-      logInfo(s"Sorting complete. Writing out partition files one at a time.")
-      val sortedIterator = sorter.sortedIterator()
-      var currentKey: InternalRow = null
-      var currentWriter: FileSinkOperator.RecordWriter = null
-      try {
-        while (sortedIterator.next()) {
-          if (currentKey != sortedIterator.getKey) {
-            if (currentWriter != null) {
-              currentWriter.close(false)
-            }
-            currentKey = sortedIterator.getKey.copy()
-            logDebug(s"Writing partition: $currentKey")
-            currentWriter = newOutputWriter(currentKey)
-          }
-
-          var i = 0
-          while (i < fieldOIs.length) {
-            outputData(i) = if (sortedIterator.getValue.isNullAt(i)) {
-              null
-            } else {
-              wrappers(i)(sortedIterator.getValue.get(i, dataTypes(i)))
-            }
-            i += 1
-          }
-          currentWriter.write(serializer.serialize(outputData, standardOI))
-        }
-      } finally {
-        if (currentWriter != null) {
-          currentWriter.close(false)
-        }
-      }
-      commit()
-    } catch {
-      case cause: Throwable =>
-        logError("Aborting task.", cause)
-        abortTask()
-        throw new SparkException("Task failed while writing rows.", cause)
-    }
-    /** Open and returns a new OutputWriter given a partition key. */
-    def newOutputWriter(key: InternalRow): FileSinkOperator.RecordWriter = {
-      val partitionPath = getPartitionString(key).getString(0)
-      val newFileSinkDesc = new FileSinkDesc(
-        fileSinkConf.getDirName + partitionPath,
-        fileSinkConf.getTableInfo,
-        fileSinkConf.getCompressed)
-      newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
-      newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
-
-      // use the path like ${hive_tmp}/_temporary/${attemptId}/
-      // to avoid write to the same file when `spark.speculation=true`
-      val path = FileOutputFormat.getTaskOutputPath(
-        conf.value,
-        partitionPath.stripPrefix("/") + "/" + getOutputName)
-
-      HiveFileFormatUtils.getHiveRecordWriter(
-        conf.value,
-        fileSinkConf.getTableInfo,
-        conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
-        newFileSinkDesc,
-        path,
-        Reporter.NULL)
-    }
-  }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 5cb8519d2a9af227e3a403923da95be11709c04d..28b5bfd5819c67ae92ff83f76258b689e79c6974 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -565,8 +565,8 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
             val filePaths = dir.map(_.getName).toList
             folders.flatMap(listFiles) ++: filePaths
           }
-          val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
-          assert(listFiles(tmpDir).sorted == expectedFiles)
+          // expect 2 files left: `.part-00000-random-uuid.crc` and `part-00000-random-uuid`
+          assert(listFiles(tmpDir).length == 2)
         }
       }
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 05a15166f815bd35d7025f64e17cf95a6b330dcb..4772a264d6fd9116a9b5020900d94c6de1763d40 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -26,6 +26,7 @@ import scala.util.control.NonFatal
 import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.catalyst.SQLBuilder
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -441,23 +442,20 @@ abstract class HiveComparisonTest
                 val executions = queryList.map(new TestHiveQueryExecution(_))
                 executions.foreach(_.toRdd)
                 val tablesGenerated = queryList.zip(executions).flatMap {
-                  // We should take executedPlan instead of sparkPlan, because in following codes we
-                  // will run the collected plans. As we will do extra processing for sparkPlan such
-                  // as adding exchange, collapsing codegen stages, etc., collecting sparkPlan here
-                  // will cause some errors when running these plans later.
-                  case (q, e) => e.executedPlan.collect {
+                  case (q, e) => e.analyzed.collect {
                     case i: InsertIntoHiveTable if tablesRead contains i.table.tableName =>
                       (q, e, i)
                   }
                 }
 
                 tablesGenerated.map { case (hiveql, execution, insert) =>
+                  val rdd = Dataset.ofRows(TestHive.sparkSession, insert.query).queryExecution.toRdd
                   s"""
                      |=== Generated Table ===
                      |$hiveql
                      |$execution
                      |== Results ==
-                     |${insert.child.execute().collect().mkString("\n")}
+                     |${rdd.collect().mkString("\n")}
                    """.stripMargin
                 }.mkString("\n")
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index ef62be39cd3495d4e33b865a0c07d2337cb05380..882a1841246803b843b49bd7d5d0747f40b2a7ba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterEach
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
@@ -799,7 +800,7 @@ class HiveDDLSuite
 
   test("Create Cataloged Table As Select - Drop Table After Runtime Exception") {
     withTable("tab") {
-      intercept[RuntimeException] {
+      intercept[SparkException] {
         sql(
           """
             |CREATE TABLE tab
@@ -1273,7 +1274,7 @@ class HiveDDLSuite
         sql("INSERT INTO t SELECT 1")
         checkAnswer(spark.table("t"), Row(1))
         // Check if this is compressed as ZLIB.
-        val maybeOrcFile = path.listFiles().find(_.getName.endsWith("part-00000"))
+        val maybeOrcFile = path.listFiles().find(!_.getName.endsWith(".crc"))
         assert(maybeOrcFile.isDefined)
         val orcFilePath = maybeOrcFile.get.toPath.toString
         val expectedCompressionKind =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 2ae66d1b2f8a6b87d9216111d0ba1137ce2f0d1d..75ba92cadacd8f6ba20ea3cc3a4861cfa806d3ef 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1043,8 +1043,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
 
     assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
       analyzedPlan.collect {
-        case _: Project => ()
-      }.size
+        case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size
+      }.sum
     }
   }
 
@@ -1062,8 +1062,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
 
     assertResult(2, "Duplicated project detected\n" + analyzedPlan) {
       analyzedPlan.collect {
-        case _: Project => ()
-      }.size
+        case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size
+      }.sum
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index d3475a79a7faea96950991462b6e0e2c85467135..5318b4650b01ffc38cfe4ddc06f738b259c3a56c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -55,7 +55,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
     val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
     checkAnswer(
       rowsDf,
-      (child: SparkPlan) => new ScriptTransformation(
+      (child: SparkPlan) => new ScriptTransformationExec(
         input = Seq(rowsDf.col("a").expr),
         script = "cat",
         output = Seq(AttributeReference("a", StringType)()),
@@ -71,7 +71,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
     val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
     checkAnswer(
       rowsDf,
-      (child: SparkPlan) => new ScriptTransformation(
+      (child: SparkPlan) => new ScriptTransformationExec(
         input = Seq(rowsDf.col("a").expr),
         script = "cat",
         output = Seq(AttributeReference("a", StringType)()),
@@ -88,7 +88,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
     val e = intercept[TestFailedException] {
       checkAnswer(
         rowsDf,
-        (child: SparkPlan) => new ScriptTransformation(
+        (child: SparkPlan) => new ScriptTransformationExec(
           input = Seq(rowsDf.col("a").expr),
           script = "cat",
           output = Seq(AttributeReference("a", StringType)()),
@@ -107,7 +107,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
     val e = intercept[TestFailedException] {
       checkAnswer(
         rowsDf,
-        (child: SparkPlan) => new ScriptTransformation(
+        (child: SparkPlan) => new ScriptTransformationExec(
           input = Seq(rowsDf.col("a").expr),
           script = "cat",
           output = Seq(AttributeReference("a", StringType)()),
@@ -126,7 +126,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
 
     val e = intercept[SparkException] {
       val plan =
-        new ScriptTransformation(
+        new ScriptTransformationExec(
           input = Seq(rowsDf.col("a").expr),
           script = "some_non_existent_command",
           output = Seq(AttributeReference("a", StringType)()),