Skip to content
Snippets Groups Projects
Commit 23468e7e authored by Cheng Lian's avatar Cheng Lian Committed by Michael Armbrust
Browse files

[SPARK-2220][SQL] Fixes remaining Hive commands

This PR adds support for the `ADD FILE` Hive command, and removes `ShellCommand` and `SourceCommand`. The reason is described in [this SPARK-2220 comment](https://issues.apache.org/jira/browse/SPARK-2220?focusedCommentId=14191841&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14191841).

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #3038 from liancheng/hive-commands and squashes the following commits:

6db61e0 [Cheng Lian] Fixes remaining Hive commands
parent ea465af1
No related merge requests found
...@@ -137,7 +137,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr ...@@ -137,7 +137,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
protected val LAZY = Keyword("LAZY") protected val LAZY = Keyword("LAZY")
protected val SET = Keyword("SET") protected val SET = Keyword("SET")
protected val TABLE = Keyword("TABLE") protected val TABLE = Keyword("TABLE")
protected val SOURCE = Keyword("SOURCE")
protected val UNCACHE = Keyword("UNCACHE") protected val UNCACHE = Keyword("UNCACHE")
protected implicit def asParser(k: Keyword): Parser[String] = protected implicit def asParser(k: Keyword): Parser[String] =
...@@ -152,8 +151,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr ...@@ -152,8 +151,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
override val lexical = new SqlLexical(reservedWords) override val lexical = new SqlLexical(reservedWords)
override protected lazy val start: Parser[LogicalPlan] = override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others
cache | uncache | set | shell | source | others
private lazy val cache: Parser[LogicalPlan] = private lazy val cache: Parser[LogicalPlan] =
CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ { CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
...@@ -171,16 +169,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr ...@@ -171,16 +169,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
case input => SetCommandParser(input) case input => SetCommandParser(input)
} }
private lazy val shell: Parser[LogicalPlan] =
"!" ~> restInput ^^ {
case input => ShellCommand(input.trim)
}
private lazy val source: Parser[LogicalPlan] =
SOURCE ~> restInput ^^ {
case input => SourceCommand(input.trim)
}
private lazy val others: Parser[LogicalPlan] = private lazy val others: Parser[LogicalPlan] =
wholeInput ^^ { wholeInput ^^ {
case input => fallback(input) case input => fallback(input)
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.types.StringType import org.apache.spark.sql.catalyst.types.StringType
/** /**
...@@ -41,6 +41,15 @@ case class NativeCommand(cmd: String) extends Command { ...@@ -41,6 +41,15 @@ case class NativeCommand(cmd: String) extends Command {
/** /**
* Commands of the form "SET [key [= value] ]". * Commands of the form "SET [key [= value] ]".
*/ */
case class DFSCommand(kv: Option[(String, Option[String])]) extends Command {
override def output = Seq(
AttributeReference("DFS output", StringType, nullable = false)())
}
/**
*
* Commands of the form "SET [key [= value] ]".
*/
case class SetCommand(kv: Option[(String, Option[String])]) extends Command { case class SetCommand(kv: Option[(String, Option[String])]) extends Command {
override def output = Seq( override def output = Seq(
AttributeReference("", StringType, nullable = false)()) AttributeReference("", StringType, nullable = false)())
...@@ -81,14 +90,3 @@ case class DescribeCommand( ...@@ -81,14 +90,3 @@ case class DescribeCommand(
AttributeReference("data_type", StringType, nullable = false)(), AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("comment", StringType, nullable = false)()) AttributeReference("comment", StringType, nullable = false)())
} }
/**
* Returned for the "! shellCommand" command
*/
case class ShellCommand(cmd: String) extends Command
/**
* Returned for the "SOURCE file" command
*/
case class SourceCommand(filePath: String) extends Command
...@@ -206,6 +206,8 @@ private[hive] trait HiveStrategies { ...@@ -206,6 +206,8 @@ private[hive] trait HiveStrategies {
case hive.AddJar(path) => execution.AddJar(path) :: Nil case hive.AddJar(path) => execution.AddJar(path) :: Nil
case hive.AddFile(path) => execution.AddFile(path) :: Nil
case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
case describe: logical.DescribeCommand => case describe: logical.DescribeCommand =>
......
...@@ -76,3 +76,19 @@ case class AddJar(path: String) extends LeafNode with Command { ...@@ -76,3 +76,19 @@ case class AddJar(path: String) extends LeafNode with Command {
Seq.empty[Row] Seq.empty[Row]
} }
} }
/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class AddFile(path: String) extends LeafNode with Command {
def hiveContext = sqlContext.asInstanceOf[HiveContext]
override def output = Seq.empty
override protected lazy val sideEffectResult: Seq[Row] = {
hiveContext.runSqlHive(s"ADD FILE $path")
hiveContext.sparkContext.addFile(path)
Seq.empty[Row]
}
}
...@@ -17,11 +17,13 @@ ...@@ -17,11 +17,13 @@
package org.apache.spark.sql.hive.execution package org.apache.spark.sql.hive.execution
import java.io.File
import scala.util.Try import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.SparkException import org.apache.spark.{SparkFiles, SparkException}
import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive
...@@ -569,7 +571,7 @@ class HiveQuerySuite extends HiveComparisonTest { ...@@ -569,7 +571,7 @@ class HiveQuerySuite extends HiveComparisonTest {
|WITH serdeproperties('s1'='9') |WITH serdeproperties('s1'='9')
""".stripMargin) """.stripMargin)
} }
// Now only verify 0.12.0, and ignore other versions due to binary compatability // Now only verify 0.12.0, and ignore other versions due to binary compatibility
// current TestSerDe.jar is from 0.12.0 // current TestSerDe.jar is from 0.12.0
if (HiveShim.version == "0.12.0") { if (HiveShim.version == "0.12.0") {
sql(s"ADD JAR $testJar") sql(s"ADD JAR $testJar")
...@@ -581,6 +583,17 @@ class HiveQuerySuite extends HiveComparisonTest { ...@@ -581,6 +583,17 @@ class HiveQuerySuite extends HiveComparisonTest {
sql("DROP TABLE alter1") sql("DROP TABLE alter1")
} }
test("ADD FILE command") {
val testFile = TestHive.getHiveFile("data/files/v1.txt").getCanonicalFile
sql(s"ADD FILE $testFile")
val checkAddFileRDD = sparkContext.parallelize(1 to 2, 1).mapPartitions { _ =>
Iterator.single(new File(SparkFiles.get("v1.txt")).canRead)
}
assert(checkAddFileRDD.first())
}
case class LogEntry(filename: String, message: String) case class LogEntry(filename: String, message: String)
case class LogFile(name: String) case class LogFile(name: String)
...@@ -816,7 +829,7 @@ class HiveQuerySuite extends HiveComparisonTest { ...@@ -816,7 +829,7 @@ class HiveQuerySuite extends HiveComparisonTest {
createQueryTest("select from thrift based table", createQueryTest("select from thrift based table",
"SELECT * from src_thrift") "SELECT * from src_thrift")
// Put tests that depend on specific Hive settings before these last two test, // Put tests that depend on specific Hive settings before these last two test,
// since they modify /clear stuff. // since they modify /clear stuff.
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment