Skip to content
Snippets Groups Projects
Commit 32be51fb authored by Jurriaan Pruis's avatar Jurriaan Pruis Committed by Reynold Xin
Browse files

[SPARK-15323][SPARK-14463][SQL] Fix reading of partitioned format=text datasets

https://issues.apache.org/jira/browse/SPARK-15323

I was using partitioned text datasets in Spark 1.6.1 but it broke in Spark 2.0.0.

It would be logical if you could also write those,
but not entirely sure how to solve this with the new DataSet implementation.

Also it doesn't work using `sqlContext.read.text`, since that method returns a `DataSet[String]`.
See https://issues.apache.org/jira/browse/SPARK-14463 for that issue.

Author: Jurriaan Pruis <email@jurriaanpruis.nl>

Closes #13104 from jurriaan/fix-partitioned-text-reads.
parent 84b23453
No related branches found
No related tags found
No related merge requests found
...@@ -457,7 +457,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { ...@@ -457,7 +457,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/ */
@scala.annotation.varargs @scala.annotation.varargs
def text(paths: String*): Dataset[String] = { def text(paths: String*): Dataset[String] = {
format("text").load(paths : _*).as[String](sparkSession.implicits.newStringEncoder) format("text").load(paths : _*).select("value")
.as[String](sparkSession.implicits.newStringEncoder)
} }
/////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////
......
...@@ -83,19 +83,6 @@ class DefaultSource extends FileFormat with DataSourceRegister { ...@@ -83,19 +83,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
} }
} }
override private[sql] def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
// Text data source doesn't support partitioning. Here we simply delegate to `buildReader`.
buildReader(
sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
}
override def buildReader( override def buildReader(
sparkSession: SparkSession, sparkSession: SparkSession,
dataSchema: StructType, dataSchema: StructType,
...@@ -152,4 +139,3 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp ...@@ -152,4 +139,3 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
recordWriter.close(context) recordWriter.close(context)
} }
} }
2014-test
2015-test
...@@ -65,6 +65,26 @@ class TextSuite extends QueryTest with SharedSQLContext { ...@@ -65,6 +65,26 @@ class TextSuite extends QueryTest with SharedSQLContext {
} }
} }
test("reading partitioned data using read.text()") {
val partitionedData = Thread.currentThread().getContextClassLoader
.getResource("text-partitioned").toString
val df = spark.read.text(partitionedData)
val data = df.collect()
assert(df.schema == new StructType().add("value", StringType))
assert(data.length == 2)
}
test("support for partitioned reading") {
val partitionedData = Thread.currentThread().getContextClassLoader
.getResource("text-partitioned").toString
val df = spark.read.format("text").load(partitionedData)
val data = df.filter("year = '2015'").select("value").collect()
assert(data(0) == Row("2015-test"))
assert(data.length == 1)
}
test("SPARK-13503 Support to specify the option for compression codec for TEXT") { test("SPARK-13503 Support to specify the option for compression codec for TEXT") {
val testDf = spark.read.text(testFile) val testDf = spark.read.text(testFile)
val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", "gzip" -> ".gz") val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", "gzip" -> ".gz")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment