From 23468e7e96bf047ba53806352558b9d661567b23 Mon Sep 17 00:00:00 2001 From: Cheng Lian <lian.cs.zju@gmail.com> Date: Fri, 31 Oct 2014 11:34:51 -0700 Subject: [PATCH] [SPARK-2220][SQL] Fixes remaining Hive commands This PR adds support for the `ADD FILE` Hive command, and removes `ShellCommand` and `SourceCommand`. The reason is described in [this SPARK-2220 comment](https://issues.apache.org/jira/browse/SPARK-2220?focusedCommentId=14191841&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14191841). Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #3038 from liancheng/hive-commands and squashes the following commits: 6db61e0 [Cheng Lian] Fixes remaining Hive commands --- .../spark/sql/catalyst/SparkSQLParser.scala | 14 +----------- .../sql/catalyst/plans/logical/commands.scala | 22 +++++++++---------- .../spark/sql/hive/HiveStrategies.scala | 2 ++ .../spark/sql/hive/execution/commands.scala | 16 ++++++++++++++ .../sql/hive/execution/HiveQuerySuite.scala | 19 +++++++++++++--- 5 files changed, 45 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala index 12e8346a64..f5c19ee69c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala @@ -137,7 +137,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr protected val LAZY = Keyword("LAZY") protected val SET = Keyword("SET") protected val TABLE = Keyword("TABLE") - protected val SOURCE = Keyword("SOURCE") protected val UNCACHE = Keyword("UNCACHE") protected implicit def asParser(k: Keyword): Parser[String] = @@ -152,8 +151,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr override val lexical = new SqlLexical(reservedWords) - override protected lazy val start: Parser[LogicalPlan] = - cache | uncache | set | shell | source | others + override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others private lazy val cache: Parser[LogicalPlan] = CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ { @@ -171,16 +169,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr case input => SetCommandParser(input) } - private lazy val shell: Parser[LogicalPlan] = - "!" ~> restInput ^^ { - case input => ShellCommand(input.trim) - } - - private lazy val source: Parser[LogicalPlan] = - SOURCE ~> restInput ^^ { - case input => SourceCommand(input.trim) - } - private lazy val others: Parser[LogicalPlan] = wholeInput ^^ { case input => fallback(input) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index b8ba2ee428..1d513d7789 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.types.StringType /** @@ -41,6 +41,15 @@ case class NativeCommand(cmd: String) extends Command { /** * Commands of the form "SET [key [= value] ]". */ +case class DFSCommand(kv: Option[(String, Option[String])]) extends Command { + override def output = Seq( + AttributeReference("DFS output", StringType, nullable = false)()) +} + +/** + * + * Commands of the form "SET [key [= value] ]". + */ case class SetCommand(kv: Option[(String, Option[String])]) extends Command { override def output = Seq( AttributeReference("", StringType, nullable = false)()) @@ -81,14 +90,3 @@ case class DescribeCommand( AttributeReference("data_type", StringType, nullable = false)(), AttributeReference("comment", StringType, nullable = false)()) } - -/** - * Returned for the "! shellCommand" command - */ -case class ShellCommand(cmd: String) extends Command - - -/** - * Returned for the "SOURCE file" command - */ -case class SourceCommand(filePath: String) extends Command diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index e59d4d536a..3207ad81d9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -206,6 +206,8 @@ private[hive] trait HiveStrategies { case hive.AddJar(path) => execution.AddJar(path) :: Nil + case hive.AddFile(path) => execution.AddFile(path) :: Nil + case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil case describe: logical.DescribeCommand => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 0fc674af31..903075edf7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -76,3 +76,19 @@ case class AddJar(path: String) extends LeafNode with Command { Seq.empty[Row] } } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class AddFile(path: String) extends LeafNode with Command { + def hiveContext = sqlContext.asInstanceOf[HiveContext] + + override def output = Seq.empty + + override protected lazy val sideEffectResult: Seq[Row] = { + hiveContext.runSqlHive(s"ADD FILE $path") + hiveContext.sparkContext.addFile(path) + Seq.empty[Row] + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index ffe1f0b90f..5918f888c8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.hive.execution +import java.io.File + import scala.util.Try import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.spark.SparkException +import org.apache.spark.{SparkFiles, SparkException} import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive @@ -569,7 +571,7 @@ class HiveQuerySuite extends HiveComparisonTest { |WITH serdeproperties('s1'='9') """.stripMargin) } - // Now only verify 0.12.0, and ignore other versions due to binary compatability + // Now only verify 0.12.0, and ignore other versions due to binary compatibility // current TestSerDe.jar is from 0.12.0 if (HiveShim.version == "0.12.0") { sql(s"ADD JAR $testJar") @@ -581,6 +583,17 @@ class HiveQuerySuite extends HiveComparisonTest { sql("DROP TABLE alter1") } + test("ADD FILE command") { + val testFile = TestHive.getHiveFile("data/files/v1.txt").getCanonicalFile + sql(s"ADD FILE $testFile") + + val checkAddFileRDD = sparkContext.parallelize(1 to 2, 1).mapPartitions { _ => + Iterator.single(new File(SparkFiles.get("v1.txt")).canRead) + } + + assert(checkAddFileRDD.first()) + } + case class LogEntry(filename: String, message: String) case class LogFile(name: String) @@ -816,7 +829,7 @@ class HiveQuerySuite extends HiveComparisonTest { createQueryTest("select from thrift based table", "SELECT * from src_thrift") - + // Put tests that depend on specific Hive settings before these last two test, // since they modify /clear stuff. } -- GitLab