From 986a3b8b5bedb1d64e2cf7c95bfdf5505f3e8c69 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun <dongjoon@apache.org> Date: Thu, 20 Oct 2016 09:53:12 +0100 Subject: [PATCH] [SPARK-17796][SQL] Support wildcard character in filename for LOAD DATA LOCAL INPATH ## What changes were proposed in this pull request? Currently, Spark 2.0 raises an `input path does not exist` AnalysisException if the file name contains '*'. It is misleading since it occurs when there exist some matched files. Also, it was a supported feature in Spark 1.6.2. This PR aims to support wildcard characters in filename for `LOAD DATA LOCAL INPATH` SQL command like Spark 1.6.2. **Reported Error Scenario** ```scala scala> sql("CREATE TABLE t(a string)") res0: org.apache.spark.sql.DataFrame = [] scala> sql("LOAD DATA LOCAL INPATH '/tmp/x*' INTO TABLE t") org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: /tmp/x*; ``` ## How was this patch tested? Pass the Jenkins test with a new test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #15376 from dongjoon-hyun/SPARK-17796. --- .../spark/sql/execution/command/tables.scala | 23 +++++++++++++- .../sql/hive/execution/SQLQuerySuite.scala | 30 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 403b479a0e..4c0675adb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command import java.io.File import java.net.URI +import java.nio.file.FileSystems import java.util.Date import scala.collection.mutable.ArrayBuffer @@ -245,7 +246,27 @@ case class LoadDataCommand( val loadPath = if (isLocal) { val uri = Utils.resolveURI(path) - if (!new File(uri.getPath()).exists()) { + val filePath = uri.getPath() + val exists = if (filePath.contains("*")) { + val fileSystem = FileSystems.getDefault + val pathPattern = fileSystem.getPath(filePath) + val dir = pathPattern.getParent.toString + if (dir.contains("*")) { + throw new AnalysisException( + s"LOAD DATA input path allows only filename wildcard: $path") + } + + val files = new File(dir).listFiles() + if (files == null) { + false + } else { + val matcher = fileSystem.getPathMatcher("glob:" + pathPattern.toAbsolutePath) + files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) + } + } else { + new File(filePath).exists() + } + if (!exists) { throw new AnalysisException(s"LOAD DATA input path does not exist: $path") } uri diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e26b6b57ef..495b4f874a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.hive.execution +import java.io.{File, PrintWriter} +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import scala.sys.process.{Process, ProcessLogger} import scala.util.Try +import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.spark.sql._ @@ -1917,6 +1920,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH") { + withTempDir { dir => + for (i <- 1 to 3) { + Files.write(s"$i", new File(s"$dir/part-r-0000$i"), StandardCharsets.UTF_8) + } + for (i <- 5 to 7) { + Files.write(s"$i", new File(s"$dir/part-s-0000$i"), StandardCharsets.UTF_8) + } + + withTable("load_t") { + sql("CREATE TABLE load_t (a STRING)") + sql(s"LOAD DATA LOCAL INPATH '$dir/*part-r*' INTO TABLE load_t") + checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), Row("3"))) + + val m = intercept[AnalysisException] { + sql("LOAD DATA LOCAL INPATH '/non-exist-folder/*part*' INTO TABLE load_t") + }.getMessage + assert(m.contains("LOAD DATA input path does not exist")) + + val m2 = intercept[AnalysisException] { + sql(s"LOAD DATA LOCAL INPATH '$dir*/*part*' INTO TABLE load_t") + }.getMessage + assert(m2.contains("LOAD DATA input path allows only filename wildcard")) + } + } + } + def testCommandAvailable(command: String): Boolean = { val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue()) attempt.isSuccess && attempt.get == 0 -- GitLab