From 1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Thu, 21 Apr 2016 15:59:37 -0700
Subject: [PATCH] [SPARK-14798][SQL] Move native command and script
 transformation parsing into SparkSqlAstBuilder

## What changes were proposed in this pull request?
This patch moves native command and script transformation into SparkSqlAstBuilder. This builds on #12561. See the last commit for diff.

## How was this patch tested?
Updated test cases to reflect this.

Author: Reynold Xin <rxin@databricks.com>

Closes #12564 from rxin/SPARK-14798.
---
 .../plans/logical/ScriptTransformation.scala  | 64 ++++++++++++-
 .../spark/sql/execution/SparkSqlParser.scala  | 95 ++++++++++++++++++-
 .../command}/HiveNativeCommand.scala          | 11 +--
 .../spark/sql/internal/SessionState.scala     |  2 +-
 .../execution/command/DDLCommandSuite.scala   | 20 ----
 .../spark/sql/hive/HiveMetastoreCatalog.scala |  3 +-
 .../spark/sql/hive/HiveQueryExecution.scala   |  4 +-
 .../spark/sql/hive/HiveStrategies.scala       |  5 +-
 .../apache/spark/sql/hive/SQLBuilder.scala    | 10 +-
 .../sql/hive/execution/HiveSqlParser.scala    | 84 +---------------
 .../hive/execution/ScriptTransformation.scala | 67 ++++---------
 .../apache/spark/sql/hive/test/TestHive.scala |  3 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala  |  3 +-
 .../spark/sql/hive/StatisticsSuite.scala      |  1 +
 .../hive/execution/HiveComparisonTest.scala   |  2 +-
 15 files changed, 192 insertions(+), 182 deletions(-)
 rename sql/{hive/src/main/scala/org/apache/spark/sql/hive/execution => core/src/main/scala/org/apache/spark/sql/execution/command}/HiveNativeCommand.scala (82%)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
index 578027da77..e176e9b82b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
@@ -37,7 +37,65 @@ case class ScriptTransformation(
 }
 
 /**
- * A placeholder for implementation specific input and output properties when passing data
- * to a script. For example, in Hive this would specify which SerDes to use.
+ * Input and output properties when passing data to a script.
+ * For example, in Hive this would specify which SerDes to use.
  */
-trait ScriptInputOutputSchema
+case class ScriptInputOutputSchema(
+    inputRowFormat: Seq[(String, String)],
+    outputRowFormat: Seq[(String, String)],
+    inputSerdeClass: Option[String],
+    outputSerdeClass: Option[String],
+    inputSerdeProps: Seq[(String, String)],
+    outputSerdeProps: Seq[(String, String)],
+    recordReaderClass: Option[String],
+    recordWriterClass: Option[String],
+    schemaLess: Boolean) {
+
+  def inputRowFormatSQL: Option[String] =
+    getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
+
+  def outputRowFormatSQL: Option[String] =
+    getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
+
+  /**
+   * Get the row format specification
+   * Note:
+   * 1. Changes are needed when readerClause and writerClause are supported.
+   * 2. Changes are needed when "ESCAPED BY" is supported.
+   */
+  private def getRowFormatSQL(
+    rowFormat: Seq[(String, String)],
+    serdeClass: Option[String],
+    serdeProps: Seq[(String, String)]): Option[String] = {
+    if (schemaLess) return Some("")
+
+    val rowFormatDelimited =
+      rowFormat.map {
+        case ("TOK_TABLEROWFORMATFIELD", value) =>
+          "FIELDS TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
+          "COLLECTION ITEMS TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
+          "MAP KEYS TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATLINES", value) =>
+          "LINES TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATNULL", value) =>
+          "NULL DEFINED AS " + value
+        case o => return None
+      }
+
+    val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
+    val serdePropsSQL =
+      if (serdeClass.nonEmpty) {
+        val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
+        if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
+      } else {
+        ""
+      }
+    if (rowFormat.nonEmpty) {
+      Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
+    } else {
+      Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
+    }
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ac12a72fc6..05fb1ef631 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution
 
 import scala.collection.JavaConverters._
+import scala.util.Try
 
 import org.antlr.v4.runtime.{ParserRuleContext, Token}
 
@@ -26,16 +27,27 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
 import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _}
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
+
 
 /**
  * Concrete parser for Spark SQL statements.
  */
-class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{
+class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
   val astBuilder = new SparkSqlAstBuilder(conf)
+
+  private val substitutor = new VariableSubstitution(conf)
+
+  protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
+    super.parse(substitutor.substitute(command))(toResult)
+  }
+
+  protected override def nativeCommand(sqlText: String): LogicalPlan = {
+    HiveNativeCommand(substitutor.substitute(sqlText))
+  }
 }
 
 /**
@@ -44,6 +56,14 @@ class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{
 class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   import org.apache.spark.sql.catalyst.parser.ParserUtils._
 
+  /**
+   * Pass a command to Hive using a [[HiveNativeCommand]].
+   */
+  override def visitExecuteNativeCommand(
+      ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
+    HiveNativeCommand(command(ctx))
+  }
+
   /**
    * Create a [[SetCommand]] logical plan.
    *
@@ -1127,4 +1147,73 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         Option(col.STRING).map(string))
     }
   }
+
+  /**
+   * Create a [[ScriptInputOutputSchema]].
+   */
+  override protected def withScriptIOSchema(
+      ctx: QuerySpecificationContext,
+      inRowFormat: RowFormatContext,
+      recordWriter: Token,
+      outRowFormat: RowFormatContext,
+      recordReader: Token,
+      schemaLess: Boolean): ScriptInputOutputSchema = {
+    if (recordWriter != null || recordReader != null) {
+      throw new ParseException(
+        "Unsupported operation: Used defined record reader/writer classes.", ctx)
+    }
+
+    // Decode and input/output format.
+    type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
+    def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
+      case c: RowFormatDelimitedContext =>
+        // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
+        // expects a seq of pairs in which the old parsers' token names are used as keys.
+        // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
+        // retrieving the key value pairs ourselves.
+        def entry(key: String, value: Token): Seq[(String, String)] = {
+          Option(value).map(t => key -> t.getText).toSeq
+        }
+        val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
+          entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
+          entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
+          entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
+          entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
+
+        (entries, None, Seq.empty, None)
+
+      case c: RowFormatSerdeContext =>
+        // Use a serde format.
+        val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
+
+        // SPARK-10310: Special cases LazySimpleSerDe
+        val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
+          Try(conf.getConfString(configKey)).toOption
+        } else {
+          None
+        }
+        (Seq.empty, Option(name), props.toSeq, recordHandler)
+
+      case null =>
+        // Use default (serde) format.
+        val name = conf.getConfString("hive.script.serde",
+          "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
+        val props = Seq("field.delim" -> "\t")
+        val recordHandler = Try(conf.getConfString(configKey)).toOption
+        (Nil, Option(name), props, recordHandler)
+    }
+
+    val (inFormat, inSerdeClass, inSerdeProps, reader) =
+      format(inRowFormat, "hive.script.recordreader")
+
+    val (outFormat, outSerdeClass, outSerdeProps, writer) =
+      format(outRowFormat, "hive.script.recordwriter")
+
+    ScriptInputOutputSchema(
+      inFormat, outFormat,
+      inSerdeClass, outSerdeClass,
+      inSerdeProps, outSerdeProps,
+      reader, writer,
+      schemaLess)
+  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
similarity index 82%
rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
index 8c1f4a8dc5..39e441f1c3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
@@ -15,22 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.hive.execution
+package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.HiveSessionState
 import org.apache.spark.sql.types.StringType
 
-private[hive]
+/**
+ * A command that we delegate to Hive. Eventually we should remove this.
+ */
 case class HiveNativeCommand(sql: String) extends RunnableCommand {
 
   override def output: Seq[AttributeReference] =
     Seq(AttributeReference("result", StringType, nullable = false)())
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sessionState.asInstanceOf[HiveSessionState].runNativeSql(sql).map(Row(_))
+    sqlContext.sessionState.runNativeSql(sql).map(Row(_))
   }
-
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8563dc3d5a..e1be4b882f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -167,7 +167,7 @@ private[sql] class SessionState(ctx: SQLContext) {
   }
 
   def runNativeSql(sql: String): Seq[String] = {
-    throw new UnsupportedOperationException
+    throw new AnalysisException("Unsupported query: " + sql)
   }
 
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index e99eb02252..a1ffda9656 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -611,26 +611,6 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed2, expected2)
   }
 
-  test("unsupported operations") {
-    intercept[ParseException] {
-      parser.parsePlan("DROP TABLE tab PURGE")
-    }
-    intercept[ParseException] {
-      parser.parsePlan("DROP TABLE tab FOR REPLICATION('eventid')")
-    }
-    intercept[ParseException] {
-      parser.parsePlan(
-        """
-          |CREATE EXTERNAL TABLE oneToTenDef
-          |USING org.apache.spark.sql.sources
-          |OPTIONS (from '1', to '10')
-        """.stripMargin)
-    }
-    intercept[ParseException] {
-      parser.parsePlan("SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue) FROM testData")
-    }
-  }
-
   test("SPARK-14383: DISTRIBUTE and UNSET as non-keywords") {
     val sql = "SELECT distribute, unset FROM x"
     val parsed = parser.parsePlan(sql)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8732285dac..ca397910c6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,10 +32,9 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand}
+import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
 import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.types._
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
index 1c1bfb610c..0ee34f07fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.hive
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
-import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{ExecutedCommand, HiveNativeCommand, SetCommand}
+import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
 
 
 /**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index bbdcc8c6c2..8720e54ed6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -38,8 +38,9 @@ private[hive] trait HiveStrategies {
 
   object Scripts extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) =>
-        ScriptTransformation(input, script, output, planLater(child), schema)(hiveconf) :: Nil
+      case logical.ScriptTransformation(input, script, output, child, ioschema) =>
+        val hiveIoSchema = HiveScriptIOSchema(ioschema)
+        ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
       case _ => Nil
     }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 2d44813f0e..86115d0e9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.execution.HiveScriptIOSchema
 import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
 
 /**
@@ -210,13 +209,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
   }
 
   private def scriptTransformationToSQL(plan: ScriptTransformation): String = {
-    val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema]
-    val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse(
+    val inputRowFormatSQL = plan.ioschema.inputRowFormatSQL.getOrElse(
       throw new UnsupportedOperationException(
-        s"unsupported row format ${ioSchema.inputRowFormat}"))
-    val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse(
+        s"unsupported row format ${plan.ioschema.inputRowFormat}"))
+    val outputRowFormatSQL = plan.ioschema.outputRowFormatSQL.getOrElse(
       throw new UnsupportedOperationException(
-        s"unsupported row format ${ioSchema.outputRowFormat}"))
+        s"unsupported row format ${plan.ioschema.outputRowFormat}"))
 
     val outputSchema = plan.output.map { attr =>
       s"${attr.sql} ${attr.dataType.simpleString}"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 989da92bc7..35530b9814 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -17,16 +17,11 @@
 
 package org.apache.spark.sql.hive.execution
 
-import scala.util.Try
-
-import org.antlr.v4.runtime.Token
-import org.apache.hadoop.hive.serde.serdeConstants
-
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.HiveNativeCommand
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
 /**
@@ -54,14 +49,6 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
 
   import ParserUtils._
 
-  /**
-   * Pass a command to Hive using a [[HiveNativeCommand]].
-   */
-  override def visitExecuteNativeCommand(
-      ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
-    HiveNativeCommand(command(ctx))
-  }
-
   /**
    * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
    * options are passed on to Hive) e.g.:
@@ -80,73 +67,4 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
       AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
     }
   }
-
-  /**
-   * Create a [[HiveScriptIOSchema]].
-   */
-  override protected def withScriptIOSchema(
-      ctx: QuerySpecificationContext,
-      inRowFormat: RowFormatContext,
-      recordWriter: Token,
-      outRowFormat: RowFormatContext,
-      recordReader: Token,
-      schemaLess: Boolean): HiveScriptIOSchema = {
-    if (recordWriter != null || recordReader != null) {
-      throw new ParseException(
-        "Unsupported operation: Used defined record reader/writer classes.", ctx)
-    }
-
-    // Decode and input/output format.
-    type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
-    def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
-      case c: RowFormatDelimitedContext =>
-        // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
-        // expects a seq of pairs in which the old parsers' token names are used as keys.
-        // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
-        // retrieving the key value pairs ourselves.
-        def entry(key: String, value: Token): Seq[(String, String)] = {
-          Option(value).map(t => key -> t.getText).toSeq
-        }
-        val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
-          entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
-          entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
-          entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
-          entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
-
-        (entries, None, Seq.empty, None)
-
-      case c: RowFormatSerdeContext =>
-        // Use a serde format.
-        val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
-
-        // SPARK-10310: Special cases LazySimpleSerDe
-        val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
-          Try(conf.getConfString(configKey)).toOption
-        } else {
-          None
-        }
-        (Seq.empty, Option(name), props.toSeq, recordHandler)
-
-      case null =>
-        // Use default (serde) format.
-        val name = conf.getConfString("hive.script.serde",
-          "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
-        val props = Seq(serdeConstants.FIELD_DELIM -> "\t")
-        val recordHandler = Try(conf.getConfString(configKey)).toOption
-        (Nil, Option(name), props, recordHandler)
-    }
-
-    val (inFormat, inSerdeClass, inSerdeProps, reader) =
-      format(inRowFormat, "hive.script.recordreader")
-
-    val (outFormat, outSerdeClass, outSerdeProps, writer) =
-      format(outRowFormat, "hive.script.recordwriter")
-
-    HiveScriptIOSchema(
-      inFormat, outFormat,
-      inSerdeClass, outSerdeClass,
-      inSerdeProps, outSerdeProps,
-      reader, writer,
-      schemaLess)
-  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 2f7cec354d..8c8becfb87 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -312,6 +312,22 @@ private class ScriptTransformationWriterThread(
   }
 }
 
+private[hive]
+object HiveScriptIOSchema {
+  def apply(input: ScriptInputOutputSchema): HiveScriptIOSchema = {
+    HiveScriptIOSchema(
+      input.inputRowFormat,
+      input.outputRowFormat,
+      input.inputSerdeClass,
+      input.outputSerdeClass,
+      input.inputSerdeProps,
+      input.outputSerdeProps,
+      input.recordReaderClass,
+      input.recordWriterClass,
+      input.schemaLess)
+  }
+}
+
 /**
  * The wrapper class of Hive input and output schema properties
  */
@@ -325,7 +341,8 @@ case class HiveScriptIOSchema (
     outputSerdeProps: Seq[(String, String)],
     recordReaderClass: Option[String],
     recordWriterClass: Option[String],
-    schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors {
+    schemaLess: Boolean)
+  extends HiveInspectors {
 
   private val defaultFormat = Map(
     ("TOK_TABLEROWFORMATFIELD", "\t"),
@@ -402,52 +419,4 @@ case class HiveScriptIOSchema (
       instance
     }
   }
-
-  def inputRowFormatSQL: Option[String] =
-    getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
-
-  def outputRowFormatSQL: Option[String] =
-    getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
-
-  /**
-   * Get the row format specification
-   * Note:
-   * 1. Changes are needed when readerClause and writerClause are supported.
-   * 2. Changes are needed when "ESCAPED BY" is supported.
-   */
-  private def getRowFormatSQL(
-      rowFormat: Seq[(String, String)],
-      serdeClass: Option[String],
-      serdeProps: Seq[(String, String)]): Option[String] = {
-    if (schemaLess) return Some("")
-
-    val rowFormatDelimited =
-      rowFormat.map {
-        case ("TOK_TABLEROWFORMATFIELD", value) =>
-          "FIELDS TERMINATED BY " + value
-        case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
-          "COLLECTION ITEMS TERMINATED BY " + value
-        case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
-          "MAP KEYS TERMINATED BY " + value
-        case ("TOK_TABLEROWFORMATLINES", value) =>
-          "LINES TERMINATED BY " + value
-        case ("TOK_TABLEROWFORMATNULL", value) =>
-          "NULL DEFINED AS " + value
-        case o => return None
-      }
-
-    val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
-    val serdePropsSQL =
-      if (serdeClass.nonEmpty) {
-        val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
-        if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
-      } else {
-        ""
-      }
-    if (rowFormat.nonEmpty) {
-      Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
-    } else {
-      Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
-    }
-  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 2bb13996c1..741e3bdd18 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -39,10 +39,9 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.CacheTableCommand
+import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 4c90dbeb1b..e3522567b9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,8 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand}
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
+import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
 import org.apache.spark.sql.hive.test.TestHive
 
 class HiveDDLCommandSuite extends PlanTest {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index e91870492a..7a6f1ce0d1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.HiveNativeCommand
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 994dc4a2d2..77906ef2b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -28,7 +28,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.command.{ExplainCommand, SetCommand}
+import org.apache.spark.sql.execution.command.{ExplainCommand, HiveNativeCommand, SetCommand}
 import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable}
 import org.apache.spark.sql.hive.SQLBuilder
-- 
GitLab