Skip to content
Snippets Groups Projects
Unverified Commit 986a3b8b authored by Dongjoon Hyun's avatar Dongjoon Hyun Committed by Sean Owen
Browse files

[SPARK-17796][SQL] Support wildcard character in filename for LOAD DATA LOCAL INPATH

## What changes were proposed in this pull request?

Currently, Spark 2.0 raises an `input path does not exist` AnalysisException if the file name contains '*'. It is misleading since it occurs when there exist some matched files. Also, it was a supported feature in Spark 1.6.2. This PR aims to support wildcard characters in filename for `LOAD DATA LOCAL INPATH` SQL command like Spark 1.6.2.

**Reported Error Scenario**
```scala
scala> sql("CREATE TABLE t(a string)")
res0: org.apache.spark.sql.DataFrame = []

scala> sql("LOAD DATA LOCAL INPATH '/tmp/x*' INTO TABLE t")
org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: /tmp/x*;
```

## How was this patch tested?

Pass the Jenkins test with a new test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #15376 from dongjoon-hyun/SPARK-17796.
parent c2c107ab
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command ...@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command
import java.io.File import java.io.File
import java.net.URI import java.net.URI
import java.nio.file.FileSystems
import java.util.Date import java.util.Date
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
...@@ -245,7 +246,27 @@ case class LoadDataCommand( ...@@ -245,7 +246,27 @@ case class LoadDataCommand(
val loadPath = val loadPath =
if (isLocal) { if (isLocal) {
val uri = Utils.resolveURI(path) val uri = Utils.resolveURI(path)
if (!new File(uri.getPath()).exists()) { val filePath = uri.getPath()
val exists = if (filePath.contains("*")) {
val fileSystem = FileSystems.getDefault
val pathPattern = fileSystem.getPath(filePath)
val dir = pathPattern.getParent.toString
if (dir.contains("*")) {
throw new AnalysisException(
s"LOAD DATA input path allows only filename wildcard: $path")
}
val files = new File(dir).listFiles()
if (files == null) {
false
} else {
val matcher = fileSystem.getPathMatcher("glob:" + pathPattern.toAbsolutePath)
files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
}
} else {
new File(filePath).exists()
}
if (!exists) {
throw new AnalysisException(s"LOAD DATA input path does not exist: $path") throw new AnalysisException(s"LOAD DATA input path does not exist: $path")
} }
uri uri
......
...@@ -17,11 +17,14 @@ ...@@ -17,11 +17,14 @@
package org.apache.spark.sql.hive.execution package org.apache.spark.sql.hive.execution
import java.io.{File, PrintWriter}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp} import java.sql.{Date, Timestamp}
import scala.sys.process.{Process, ProcessLogger} import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try import scala.util.Try
import com.google.common.io.Files
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.spark.sql._ import org.apache.spark.sql._
...@@ -1917,6 +1920,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ...@@ -1917,6 +1920,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
} }
} }
test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH") {
withTempDir { dir =>
for (i <- 1 to 3) {
Files.write(s"$i", new File(s"$dir/part-r-0000$i"), StandardCharsets.UTF_8)
}
for (i <- 5 to 7) {
Files.write(s"$i", new File(s"$dir/part-s-0000$i"), StandardCharsets.UTF_8)
}
withTable("load_t") {
sql("CREATE TABLE load_t (a STRING)")
sql(s"LOAD DATA LOCAL INPATH '$dir/*part-r*' INTO TABLE load_t")
checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), Row("3")))
val m = intercept[AnalysisException] {
sql("LOAD DATA LOCAL INPATH '/non-exist-folder/*part*' INTO TABLE load_t")
}.getMessage
assert(m.contains("LOAD DATA input path does not exist"))
val m2 = intercept[AnalysisException] {
sql(s"LOAD DATA LOCAL INPATH '$dir*/*part*' INTO TABLE load_t")
}.getMessage
assert(m2.contains("LOAD DATA input path allows only filename wildcard"))
}
}
}
def testCommandAvailable(command: String): Boolean = { def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue()) val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0 attempt.isSuccess && attempt.get == 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment