diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala index 12e8346a6445d5ac0e5433b8de6986e5122adccb..f5c19ee69c37ad23f014941620c622e61f5fbae2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala @@ -137,7 +137,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr protected val LAZY = Keyword("LAZY") protected val SET = Keyword("SET") protected val TABLE = Keyword("TABLE") - protected val SOURCE = Keyword("SOURCE") protected val UNCACHE = Keyword("UNCACHE") protected implicit def asParser(k: Keyword): Parser[String] = @@ -152,8 +151,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr override val lexical = new SqlLexical(reservedWords) - override protected lazy val start: Parser[LogicalPlan] = - cache | uncache | set | shell | source | others + override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others private lazy val cache: Parser[LogicalPlan] = CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ { @@ -171,16 +169,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr case input => SetCommandParser(input) } - private lazy val shell: Parser[LogicalPlan] = - "!" ~> restInput ^^ { - case input => ShellCommand(input.trim) - } - - private lazy val source: Parser[LogicalPlan] = - SOURCE ~> restInput ^^ { - case input => SourceCommand(input.trim) - } - private lazy val others: Parser[LogicalPlan] = wholeInput ^^ { case input => fallback(input) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index b8ba2ee428a20f68b9f2d307823d722f2680801b..1d513d77897636e49e38afa0092e1a18697ce1d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.types.StringType /** @@ -41,6 +41,15 @@ case class NativeCommand(cmd: String) extends Command { /** * Commands of the form "SET [key [= value] ]". */ +case class DFSCommand(kv: Option[(String, Option[String])]) extends Command { + override def output = Seq( + AttributeReference("DFS output", StringType, nullable = false)()) +} + +/** + * + * Commands of the form "SET [key [= value] ]". + */ case class SetCommand(kv: Option[(String, Option[String])]) extends Command { override def output = Seq( AttributeReference("", StringType, nullable = false)()) @@ -81,14 +90,3 @@ case class DescribeCommand( AttributeReference("data_type", StringType, nullable = false)(), AttributeReference("comment", StringType, nullable = false)()) } - -/** - * Returned for the "! shellCommand" command - */ -case class ShellCommand(cmd: String) extends Command - - -/** - * Returned for the "SOURCE file" command - */ -case class SourceCommand(filePath: String) extends Command diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index e59d4d536a0af370e1953251272b7e25de2ccb3c..3207ad81d957102ba354cc539826b0c040a24d8c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -206,6 +206,8 @@ private[hive] trait HiveStrategies { case hive.AddJar(path) => execution.AddJar(path) :: Nil + case hive.AddFile(path) => execution.AddFile(path) :: Nil + case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil case describe: logical.DescribeCommand => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 0fc674af318857582ae650b21d730ff166a40edf..903075edf7e04b0261c3faaed5b567af3f663557 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -76,3 +76,19 @@ case class AddJar(path: String) extends LeafNode with Command { Seq.empty[Row] } } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class AddFile(path: String) extends LeafNode with Command { + def hiveContext = sqlContext.asInstanceOf[HiveContext] + + override def output = Seq.empty + + override protected lazy val sideEffectResult: Seq[Row] = { + hiveContext.runSqlHive(s"ADD FILE $path") + hiveContext.sparkContext.addFile(path) + Seq.empty[Row] + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index ffe1f0b90fcd014398125e2061b422348327d7c5..5918f888c8f4cbd5b327cab7e7b554929cd06c27 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.hive.execution +import java.io.File + import scala.util.Try import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.spark.SparkException +import org.apache.spark.{SparkFiles, SparkException} import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive @@ -569,7 +571,7 @@ class HiveQuerySuite extends HiveComparisonTest { |WITH serdeproperties('s1'='9') """.stripMargin) } - // Now only verify 0.12.0, and ignore other versions due to binary compatability + // Now only verify 0.12.0, and ignore other versions due to binary compatibility // current TestSerDe.jar is from 0.12.0 if (HiveShim.version == "0.12.0") { sql(s"ADD JAR $testJar") @@ -581,6 +583,17 @@ class HiveQuerySuite extends HiveComparisonTest { sql("DROP TABLE alter1") } + test("ADD FILE command") { + val testFile = TestHive.getHiveFile("data/files/v1.txt").getCanonicalFile + sql(s"ADD FILE $testFile") + + val checkAddFileRDD = sparkContext.parallelize(1 to 2, 1).mapPartitions { _ => + Iterator.single(new File(SparkFiles.get("v1.txt")).canRead) + } + + assert(checkAddFileRDD.first()) + } + case class LogEntry(filename: String, message: String) case class LogFile(name: String) @@ -816,7 +829,7 @@ class HiveQuerySuite extends HiveComparisonTest { createQueryTest("select from thrift based table", "SELECT * from src_thrift") - + // Put tests that depend on specific Hive settings before these last two test, // since they modify /clear stuff. }