diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index f43d8bf646a9e027573bd2a16284f6b7fc466fb6..93832d4c713e5ac208b60ea27b764e6411c6ea01 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -116,6 +116,10 @@ class DataFrameReader(object): ... opt2=1, opt3='str') >>> df.dtypes [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] + >>> df = sqlContext.read.format('json').load(['python/test_support/sql/people.json', + ... 'python/test_support/sql/people1.json']) + >>> df.dtypes + [('age', 'bigint'), ('aka', 'string'), ('name', 'string')] """ if format is not None: self.format(format) @@ -123,7 +127,15 @@ class DataFrameReader(object): self.schema(schema) self.options(**options) if path is not None: - return self._df(self._jreader.load(path)) + if type(path) == list: + paths = path + gateway = self._sqlContext._sc._gateway + jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths)) + for i in range(0, len(paths)): + jpaths[i] = paths[i] + return self._df(self._jreader.load(jpaths)) + else: + return self._df(self._jreader.load(path)) else: return self._df(self._jreader.load()) diff --git a/python/test_support/sql/people1.json b/python/test_support/sql/people1.json new file mode 100644 index 0000000000000000000000000000000000000000..6d217da77d1550a8212622e9058e409597c86ae6 --- /dev/null +++ b/python/test_support/sql/people1.json @@ -0,0 +1,2 @@ +{"name":"Jonathan", "aka": "John"} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index eacdea2c1e5b3e9fd7cd9c4890665fd40fa0c41e..e8651a3569d6f7f5962480cf01aed6df54e81758 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -22,6 +22,7 @@ import java.util.Properties import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.StringUtils import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD @@ -123,6 +124,16 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { DataFrame(sqlContext, LogicalRelation(resolved.relation)) } + /** + * Loads input in as a [[DataFrame]], for data sources that support multiple paths. + * Only works if the source is a HadoopFsRelationProvider. + * + * @since 1.6.0 + */ + def load(paths: Array[String]): DataFrame = { + option("paths", paths.map(StringUtils.escapeString(_, '\\', ',')).mkString(",")).load() + } + /** * Construct a [[DataFrame]] representing the database table accessible via JDBC URL * url named table and connection properties. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 011724436621d3c2bf8917af6d1f55fa2a64602a..54beabbf63b5f39cce115768e11a46050360a3a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions} import scala.util.{Success, Failure, Try} import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.StringUtils import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil @@ -89,7 +90,11 @@ object ResolvedDataSource extends Logging { val relation = userSpecifiedSchema match { case Some(schema: StructType) => clazz.newInstance() match { case dataSource: SchemaRelationProvider => - dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema) + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + if (caseInsensitiveOptions.contains("paths")) { + throw new AnalysisException(s"$className does not support paths option.") + } + dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema) case dataSource: HadoopFsRelationProvider => val maybePartitionsSchema = if (partitionColumns.isEmpty) { None @@ -99,10 +104,19 @@ object ResolvedDataSource extends Logging { val caseInsensitiveOptions = new CaseInsensitiveMap(options) val paths = { - val patternPath = new Path(caseInsensitiveOptions("path")) - val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray + if (caseInsensitiveOptions.contains("paths") && + caseInsensitiveOptions.contains("path")) { + throw new AnalysisException(s"Both path and paths options are present.") + } + caseInsensitiveOptions.get("paths") + .map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ','))) + .getOrElse(Array(caseInsensitiveOptions("path"))) + .flatMap{ pathString => + val hdfsPath = new Path(pathString) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString) + } } val dataSchema = @@ -122,14 +136,27 @@ object ResolvedDataSource extends Logging { case None => clazz.newInstance() match { case dataSource: RelationProvider => - dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + if (caseInsensitiveOptions.contains("paths")) { + throw new AnalysisException(s"$className does not support paths option.") + } + dataSource.createRelation(sqlContext, caseInsensitiveOptions) case dataSource: HadoopFsRelationProvider => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val paths = { - val patternPath = new Path(caseInsensitiveOptions("path")) - val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray + if (caseInsensitiveOptions.contains("paths") && + caseInsensitiveOptions.contains("path")) { + throw new AnalysisException(s"Both path and paths options are present.") + } + caseInsensitiveOptions.get("paths") + .map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ','))) + .getOrElse(Array(caseInsensitiveOptions("path"))) + .flatMap{ pathString => + val hdfsPath = new Path(pathString) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString) + } } dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions) case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index d919877746c72c5ae37d00e90bc34d6dff896d0d..832ea02cb6e7755fe439561f50cca0c13d9e72bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -890,6 +890,24 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .collect() } + test("SPARK-10185: Read multiple Hadoop Filesystem paths and paths with a comma in it") { + withTempDir { dir => + val df1 = Seq((1, 22)).toDF("a", "b") + val dir1 = new File(dir, "dir,1").getCanonicalPath + df1.write.format("json").save(dir1) + + val df2 = Seq((2, 23)).toDF("a", "b") + val dir2 = new File(dir, "dir2").getCanonicalPath + df2.write.format("json").save(dir2) + + checkAnswer(sqlContext.read.format("json").load(Array(dir1, dir2)), + Row(1, 22) :: Row(2, 23) :: Nil) + + checkAnswer(sqlContext.read.format("json").load(dir1), + Row(1, 22) :: Nil) + } + } + test("SPARK-10034: Sort on Aggregate with aggregation expression named 'aggOrdering'") { val df = Seq(1 -> 2).toDF("i", "j") val query = df.groupBy('i)