Skip to content
Snippets Groups Projects
Commit 01659bc5 authored by Xin Wu's avatar Xin Wu Committed by Cheng Lian
Browse files

[SPARK-15431][SQL] Support LIST FILE(s)|JAR(s) command natively

## What changes were proposed in this pull request?
Currently command `ADD FILE|JAR <filepath | jarpath>` is supported natively in SparkSQL. However, when this command is run, the file/jar is added to the resources that can not be looked up by `LIST FILE(s)|JAR(s)` command because the `LIST` command is passed to Hive command processor in Spark-SQL or simply not supported in Spark-shell. There is no way users can find out what files/jars are added to the spark context.
Refer to [Hive commands](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli)

This PR is to support following commands:
`LIST (FILE[s] [filepath ...] | JAR[s] [jarfile ...])`

### For example:
##### LIST FILE(s)
```
scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt")
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("list file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt").show(false)
+----------------------------------------------+
|result                                        |
+----------------------------------------------+
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt|
+----------------------------------------------+

scala> spark.sql("list files").show(false)
+----------------------------------------------+
|result                                        |
+----------------------------------------------+
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt|
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt |
+----------------------------------------------+
```

##### LIST JAR(s)
```
scala> spark.sql("add jar /Users/xinwu/spark/core/src/test/resources/TestUDTF.jar")
res9: org.apache.spark.sql.DataFrame = [result: int]

scala> spark.sql("list jar TestUDTF.jar").show(false)
+---------------------------------------------+
|result                                       |
+---------------------------------------------+
|spark://192.168.1.234:50131/jars/TestUDTF.jar|
+---------------------------------------------+

scala> spark.sql("list jars").show(false)
+---------------------------------------------+
|result                                       |
+---------------------------------------------+
|spark://192.168.1.234:50131/jars/TestUDTF.jar|
+---------------------------------------------+
```
## How was this patch tested?
New test cases are added for Spark-SQL, Spark-Shell and SparkContext API code path.

Author: Xin Wu <xinwu@us.ibm.com>
Author: xin Wu <xinwu@us.ibm.com>

Closes #13212 from xwu0226/list_command.
parent a8e97d17
No related branches found
No related tags found
No related merge requests found
Showing
with 149 additions and 17 deletions
...@@ -1386,6 +1386,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -1386,6 +1386,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
addFile(path, false) addFile(path, false)
} }
/**
* Returns a list of file paths that are added to resources.
*/
def listFiles(): Seq[String] = addedFiles.keySet.toSeq
/** /**
* Add a file to be downloaded with this Spark job on every node. * Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
...@@ -1724,6 +1729,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -1724,6 +1729,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
postEnvironmentUpdate() postEnvironmentUpdate()
} }
/**
* Returns a list of jar files that are added to resources.
*/
def listJars(): Seq[String] = addedJars.keySet.toSeq
// Shut down the SparkContext. // Shut down the SparkContext.
def stop() { def stop() {
if (LiveListenerBus.withinListenerThread.value) { if (LiveListenerBus.withinListenerThread.value) {
......
File added
...@@ -108,7 +108,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { ...@@ -108,7 +108,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
assert(byteArray2.length === 0) assert(byteArray2.length === 0)
} }
test("addFile works") { test("basic case for addFile and listFiles") {
val dir = Utils.createTempDir() val dir = Utils.createTempDir()
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
...@@ -156,6 +156,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { ...@@ -156,6 +156,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
} }
x x
}).count() }).count()
assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
} finally {
sc.stop()
}
}
test("add and list jar files") {
val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar")
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addJar(jarPath.toString)
assert(sc.listJars().filter(_.contains("TestUDTF.jar")).size == 1)
} finally { } finally {
sc.stop() sc.stop()
} }
......
...@@ -117,7 +117,7 @@ statement ...@@ -117,7 +117,7 @@ statement
tableIdentifier partitionSpec? #loadData tableIdentifier partitionSpec? #loadData
| TRUNCATE TABLE tableIdentifier partitionSpec? | TRUNCATE TABLE tableIdentifier partitionSpec?
(COLUMNS identifierList)? #truncateTable (COLUMNS identifierList)? #truncateTable
| ADD identifier .*? #addResource | op=(ADD | LIST) identifier .*? #manageResource
| SET ROLE .*? #failNativeCommand | SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration | SET .*? #setConfiguration
| RESET #resetConfiguration | RESET #resetConfiguration
...@@ -642,7 +642,7 @@ nonReserved ...@@ -642,7 +642,7 @@ nonReserved
| SORT | CLUSTER | DISTRIBUTE | UNSET | TBLPROPERTIES | SKEWED | STORED | DIRECTORIES | LOCATION | SORT | CLUSTER | DISTRIBUTE | UNSET | TBLPROPERTIES | SKEWED | STORED | DIRECTORIES | LOCATION
| EXCHANGE | ARCHIVE | UNARCHIVE | FILEFORMAT | TOUCH | COMPACT | CONCATENATE | CHANGE | EXCHANGE | ARCHIVE | UNARCHIVE | FILEFORMAT | TOUCH | COMPACT | CONCATENATE | CHANGE
| CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT
| DBPROPERTIES | DFS | TRUNCATE | COMPUTE | DBPROPERTIES | DFS | TRUNCATE | COMPUTE | LIST
| STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
| ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH
...@@ -843,6 +843,7 @@ DFS: 'DFS'; ...@@ -843,6 +843,7 @@ DFS: 'DFS';
TRUNCATE: 'TRUNCATE'; TRUNCATE: 'TRUNCATE';
ANALYZE: 'ANALYZE'; ANALYZE: 'ANALYZE';
COMPUTE: 'COMPUTE'; COMPUTE: 'COMPUTE';
LIST: 'LIST';
STATISTICS: 'STATISTICS'; STATISTICS: 'STATISTICS';
PARTITIONED: 'PARTITIONED'; PARTITIONED: 'PARTITIONED';
EXTERNAL: 'EXTERNAL'; EXTERNAL: 'EXTERNAL';
......
...@@ -774,13 +774,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ...@@ -774,13 +774,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
} }
/** /**
* Create an [[AddJarCommand]] or [[AddFileCommand]] command depending on the requested resource. * Create a [[AddFileCommand]], [[AddJarCommand]], [[ListFilesCommand]] or [[ListJarsCommand]]
* command depending on the requested operation on resources.
* Expected format:
* {{{
* ADD (FILE[S] <filepath ...> | JAR[S] <jarpath ...>)
* LIST (FILE[S] [filepath ...] | JAR[S] [jarpath ...])
* }}}
*/ */
override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) { override def visitManageResource(ctx: ManageResourceContext): LogicalPlan = withOrigin(ctx) {
ctx.identifier.getText.toLowerCase match { val mayebePaths = remainder(ctx.identifier).trim
case "file" => AddFileCommand(remainder(ctx.identifier).trim) ctx.op.getType match {
case "jar" => AddJarCommand(remainder(ctx.identifier).trim) case SqlBaseParser.ADD =>
case other => throw operationNotAllowed(s"ADD with resource type '$other'", ctx) ctx.identifier.getText.toLowerCase match {
case "file" => AddFileCommand(mayebePaths)
case "jar" => AddJarCommand(mayebePaths)
case other => throw operationNotAllowed(s"ADD with resource type '$other'", ctx)
}
case SqlBaseParser.LIST =>
ctx.identifier.getText.toLowerCase match {
case "files" | "file" =>
if (mayebePaths.length > 0) {
ListFilesCommand(mayebePaths.split("\\s+"))
} else {
ListFilesCommand()
}
case "jars" | "jar" =>
if (mayebePaths.length > 0) {
ListJarsCommand(mayebePaths.split("\\s+"))
} else {
ListJarsCommand()
}
case other => throw operationNotAllowed(s"LIST with resource type '$other'", ctx)
}
case _ => throw operationNotAllowed(s"Other types of operation on resources", ctx)
} }
} }
......
...@@ -17,9 +17,14 @@ ...@@ -17,9 +17,14 @@
package org.apache.spark.sql.execution.command package org.apache.spark.sql.execution.command
import java.io.File
import java.net.URI
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
/** /**
* Adds a jar to the current session so it can be used (for UDFs or serdes). * Adds a jar to the current session so it can be used (for UDFs or serdes).
...@@ -46,3 +51,51 @@ case class AddFileCommand(path: String) extends RunnableCommand { ...@@ -46,3 +51,51 @@ case class AddFileCommand(path: String) extends RunnableCommand {
Seq.empty[Row] Seq.empty[Row]
} }
} }
/**
* Returns a list of file paths that are added to resources.
* If file paths are provided, return the ones that are added to resources.
*/
case class ListFilesCommand(files: Seq[String] = Seq.empty[String]) extends RunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("Results", StringType, nullable = false)() :: Nil
}
override def run(sparkSession: SparkSession): Seq[Row] = {
val fileList = sparkSession.sparkContext.listFiles()
if (files.size > 0) {
files.map { f =>
val uri = new URI(f)
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(f).getCanonicalFile.toURI.toString
case _ => f
}
new Path(schemeCorrectedPath).toUri.toString
}.collect {
case f if fileList.contains(f) => f
}.map(Row(_))
} else {
fileList.map(Row(_))
}
}
}
/**
* Returns a list of jar files that are added to resources.
* If jar files are provided, return the ones that are added to resources.
*/
case class ListJarsCommand(jars: Seq[String] = Seq.empty[String]) extends RunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("Results", StringType, nullable = false)() :: Nil
}
override def run(sparkSession: SparkSession): Seq[Row] = {
val jarList = sparkSession.sparkContext.listJars()
if (jars.nonEmpty) {
for {
jarName <- jars.map(f => new Path(f).getName)
jarPath <- jarList if jarPath.contains(jarName)
} yield Row(jarPath)
} else {
jarList.map(Row(_))
}
}
}
...@@ -32,8 +32,7 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils} ...@@ -32,8 +32,7 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor} import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.processors.{CommandProcessorFactory, ResetProcessor, SetProcessor}
import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.thrift.transport.TSocket import org.apache.thrift.transport.TSocket
...@@ -295,9 +294,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { ...@@ -295,9 +294,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
System.exit(0) System.exit(0)
} }
if (tokens(0).toLowerCase(Locale.ENGLISH).equals("source") || if (tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
cmd_trimmed.startsWith("!") || cmd_trimmed.startsWith("!") || isRemoteMode) {
tokens(0).toLowerCase.equals("list") ||
isRemoteMode) {
val start = System.currentTimeMillis() val start = System.currentTimeMillis()
super.processCmd(cmd) super.processCmd(cmd)
val end = System.currentTimeMillis() val end = System.currentTimeMillis()
...@@ -312,7 +309,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { ...@@ -312,7 +309,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
if (proc != null) { if (proc != null) {
// scalastyle:off println // scalastyle:off println
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] || if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] ||
proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ResetProcessor]) { proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ListResourceProcessor] ||
proc.isInstanceOf[ResetProcessor] ) {
val driver = new SparkSQLDriver val driver = new SparkSQLDriver
driver.init() driver.init()
......
File added
...@@ -238,4 +238,23 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { ...@@ -238,4 +238,23 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
runCliWithin(2.minute, Seq("-e", "!echo \"This is a test for Spark-11624\";"))( runCliWithin(2.minute, Seq("-e", "!echo \"This is a test for Spark-11624\";"))(
"" -> "This is a test for Spark-11624") "" -> "This is a test for Spark-11624")
} }
test("list jars") {
val jarFile = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar")
runCliWithin(2.minute)(
s"ADD JAR $jarFile" -> "",
s"LIST JARS" -> "TestUDTF.jar",
s"List JAR $jarFile" -> "TestUDTF.jar"
)
}
test("list files") {
val dataFilePath = Thread.currentThread().getContextClassLoader
.getResource("data/files/small_kv.txt")
runCliWithin(2.minute)(
s"ADD FILE $dataFilePath" -> "",
s"LIST FILES" -> "small_kv.txt",
s"LIST FILE $dataFilePath" -> "small_kv.txt"
)
}
} }
...@@ -876,6 +876,13 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { ...@@ -876,6 +876,13 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE t1""") sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE t1""")
sql("select * from src join t1 on src.key = t1.a") sql("select * from src join t1 on src.key = t1.a")
sql("DROP TABLE t1") sql("DROP TABLE t1")
assert(sql("list jars").
filter(_.getString(0).contains("hive-hcatalog-core-0.13.1.jar")).count() > 0)
assert(sql("list jar").
filter(_.getString(0).contains("hive-hcatalog-core-0.13.1.jar")).count() > 0)
val testJar2 = TestHive.getHiveFile("TestUDTF.jar").getCanonicalPath
sql(s"ADD JAR $testJar2")
assert(sql(s"list jar $testJar").count() == 1)
} }
test("CREATE TEMPORARY FUNCTION") { test("CREATE TEMPORARY FUNCTION") {
...@@ -899,6 +906,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { ...@@ -899,6 +906,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
} }
assert(checkAddFileRDD.first()) assert(checkAddFileRDD.first())
assert(sql("list files").
filter(_.getString(0).contains("data/files/v1.txt")).count() > 0)
assert(sql("list file").
filter(_.getString(0).contains("data/files/v1.txt")).count() > 0)
assert(sql(s"list file $testFile").count() == 1)
} }
createQueryTest("dynamic_partition", createQueryTest("dynamic_partition",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment