Skip to content
Snippets Groups Projects
Commit 20a4d7db authored by Cheng Lian's avatar Cheng Lian
Browse files

[SPARK-8501] [SQL] Avoids reading schema from empty ORC files

ORC writes empty schema (`struct<>`) to ORC files containing zero rows.  This is OK for Hive since the table schema is managed by the metastore. But it causes trouble when reading raw ORC files via Spark SQL since we have to discover the schema from the files.

Notice that the ORC data source always avoids writing empty ORC files, but it's still problematic when reading Hive tables which contain empty part-files.

Author: Cheng Lian <lian@databricks.com>

Closes #7199 from liancheng/spark-8501 and squashes the following commits:

bb8cd95 [Cheng Lian] Addresses comments
a290221 [Cheng Lian] Avoids reading schema from empty ORC files
parent dfd8bac8
No related branches found
No related tags found
No related merge requests found
...@@ -24,30 +24,70 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector ...@@ -24,30 +24,70 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
private[orc] object OrcFileOperator extends Logging { private[orc] object OrcFileOperator extends Logging {
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = { /**
* Retrieves a ORC file reader from a given path. The path can point to either a directory or a
* single ORC file. If it points to an directory, it picks any non-empty ORC file within that
* directory.
*
* The reader returned by this method is mainly used for two purposes:
*
* 1. Retrieving file metadata (schema and compression codecs, etc.)
* 2. Read the actual file content (in this case, the given path should point to the target file)
*
* @note As recorded by SPARK-8501, ORC writes an empty schema (<code>struct&lt;&gt;</code) to an
* ORC file if the file contains zero rows. This is OK for Hive since the schema of the
* table is managed by metastore. But this becomes a problem when reading ORC files
* directly from HDFS via Spark SQL, because we have to discover the schema from raw ORC
* files. So this method always tries to find a ORC file whose schema is non-empty, and
* create the result reader from that file. If no such file is found, it returns `None`.
*
* @todo Needs to consider all files when schema evolution is taken into account.
*/
def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = {
def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
reader.getObjectInspector match {
case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 =>
logInfo(
s"ORC file $path has empty schema, it probably contains no rows. " +
"Trying to read another ORC file to figure out the schema.")
false
case _ => true
}
}
val conf = config.getOrElse(new Configuration) val conf = config.getOrElse(new Configuration)
val fspath = new Path(pathStr) val fs = {
val fs = fspath.getFileSystem(conf) val hdfsPath = new Path(basePath)
val orcFiles = listOrcFiles(pathStr, conf) hdfsPath.getFileSystem(conf)
logDebug(s"Creating ORC Reader from ${orcFiles.head}") }
// TODO Need to consider all files when schema evolution is taken into account.
OrcFile.createReader(fs, orcFiles.head) listOrcFiles(basePath, conf).iterator.map { path =>
path -> OrcFile.createReader(fs, path)
}.collectFirst {
case (path, reader) if isWithNonEmptySchema(path, reader) => reader
}
} }
def readSchema(path: String, conf: Option[Configuration]): StructType = { def readSchema(path: String, conf: Option[Configuration]): StructType = {
val reader = getFileReader(path, conf) val reader = getFileReader(path, conf).getOrElse {
throw new AnalysisException(
s"Failed to discover schema from ORC files stored in $path. " +
"Probably there are either no ORC files or only empty ORC files.")
}
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $path, got Hive schema string: $schema") logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType] HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
} }
def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = { def getObjectInspector(
getFileReader(path, conf).getObjectInspector.asInstanceOf[StructObjectInspector] path: String, conf: Option[Configuration]): Option[StructObjectInspector] = {
getFileReader(path, conf).map(_.getObjectInspector.asInstanceOf[StructObjectInspector])
} }
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
......
...@@ -242,26 +242,34 @@ private[orc] case class OrcTableScan( ...@@ -242,26 +242,34 @@ private[orc] case class OrcTableScan(
nonPartitionKeyAttrs: Seq[(Attribute, Int)], nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[InternalRow] = { mutableRow: MutableRow): Iterator[InternalRow] = {
val deserializer = new OrcSerde val deserializer = new OrcSerde
val soi = OrcFileOperator.getObjectInspector(path, Some(conf)) val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
case (attr, ordinal) => // SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal // rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
}.unzip // partition since we know that this file is empty.
val unwrappers = fieldRefs.map(unwrapperFor) maybeStructOI.map { soi =>
// Map each tuple to a row object val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
iterator.map { value => case (attr, ordinal) =>
val raw = deserializer.deserialize(value) soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
var i = 0 }.unzip
while (i < fieldRefs.length) { val unwrappers = fieldRefs.map(unwrapperFor)
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i)) // Map each tuple to a row object
if (fieldValue == null) { iterator.map { value =>
mutableRow.setNullAt(fieldOrdinals(i)) val raw = deserializer.deserialize(value)
} else { var i = 0
unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i)) while (i < fieldRefs.length) {
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
if (fieldValue == null) {
mutableRow.setNullAt(fieldOrdinals(i))
} else {
unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
}
i += 1
} }
i += 1 mutableRow: InternalRow
} }
mutableRow: InternalRow }.getOrElse {
Iterator.empty
} }
} }
......
...@@ -23,10 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars ...@@ -23,10 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.CompressionKind import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.InternalRow
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.hive.test.TestHive.implicits._
...@@ -170,7 +167,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { ...@@ -170,7 +167,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
test("Default compression options for writing to an ORC file") { test("Default compression options for writing to an ORC file") {
withOrcFile((1 to 100).map(i => (i, s"val_$i"))) { file => withOrcFile((1 to 100).map(i => (i, s"val_$i"))) { file =>
assertResult(CompressionKind.ZLIB) { assertResult(CompressionKind.ZLIB) {
OrcFileOperator.getFileReader(file).getCompression OrcFileOperator.getFileReader(file).get.getCompression
} }
} }
} }
...@@ -183,21 +180,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { ...@@ -183,21 +180,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY") conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY")
withOrcFile(data) { file => withOrcFile(data) { file =>
assertResult(CompressionKind.SNAPPY) { assertResult(CompressionKind.SNAPPY) {
OrcFileOperator.getFileReader(file).getCompression OrcFileOperator.getFileReader(file).get.getCompression
} }
} }
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "NONE") conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "NONE")
withOrcFile(data) { file => withOrcFile(data) { file =>
assertResult(CompressionKind.NONE) { assertResult(CompressionKind.NONE) {
OrcFileOperator.getFileReader(file).getCompression OrcFileOperator.getFileReader(file).get.getCompression
} }
} }
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "LZO") conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "LZO")
withOrcFile(data) { file => withOrcFile(data) { file =>
assertResult(CompressionKind.LZO) { assertResult(CompressionKind.LZO) {
OrcFileOperator.getFileReader(file).getCompression OrcFileOperator.getFileReader(file).get.getCompression
} }
} }
} }
...@@ -289,4 +286,48 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { ...@@ -289,4 +286,48 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
List(Row("same", "run_5", 100))) List(Row("same", "run_5", 100)))
} }
} }
test("SPARK-8501: Avoids discovery schema from empty ORC files") {
withTempPath { dir =>
val path = dir.getCanonicalPath
withTable("empty_orc") {
withTempTable("empty", "single") {
sqlContext.sql(
s"""CREATE TABLE empty_orc(key INT, value STRING)
|STORED AS ORC
|LOCATION '$path'
""".stripMargin)
val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)
emptyDF.registerTempTable("empty")
// This creates 1 empty ORC file with Hive ORC SerDe. We are using this trick because
// Spark SQL ORC data source always avoids write empty ORC files.
sqlContext.sql(
s"""INSERT INTO TABLE empty_orc
|SELECT key, value FROM empty
""".stripMargin)
val errorMessage = intercept[AnalysisException] {
sqlContext.read.format("orc").load(path)
}.getMessage
assert(errorMessage.contains("Failed to discover schema from ORC files"))
val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1)
singleRowDF.registerTempTable("single")
sqlContext.sql(
s"""INSERT INTO TABLE empty_orc
|SELECT key, value FROM single
""".stripMargin)
val df = sqlContext.read.format("orc").load(path)
assert(df.schema === singleRowDF.schema.asNullable)
checkAnswer(df, singleRowDF)
}
}
}
}
} }
...@@ -43,14 +43,8 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { ...@@ -43,14 +43,8 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
orcTableDir.mkdir() orcTableDir.mkdir()
import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.hive.test.TestHive.implicits._
// Originally we were using a 10-row RDD for testing. However, when default parallelism is
// greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
// which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and
// causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
// for more details. To workaround this issue before fixing SPARK-8501, we simply increase row
// number in this RDD to avoid empty partitions.
sparkContext sparkContext
.makeRDD(1 to 100) .makeRDD(1 to 10)
.map(i => OrcData(i, s"part-$i")) .map(i => OrcData(i, s"part-$i"))
.toDF() .toDF()
.registerTempTable(s"orc_temp_table") .registerTempTable(s"orc_temp_table")
...@@ -76,35 +70,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { ...@@ -76,35 +70,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
} }
test("create temporary orc table") { test("create temporary orc table") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100)) checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
checkAnswer( checkAnswer(
sql("SELECT * FROM normal_orc_source"), sql("SELECT * FROM normal_orc_source"),
(1 to 100).map(i => Row(i, s"part-$i"))) (1 to 10).map(i => Row(i, s"part-$i")))
checkAnswer( checkAnswer(
sql("SELECT * FROM normal_orc_source where intField > 5"), sql("SELECT * FROM normal_orc_source where intField > 5"),
(6 to 100).map(i => Row(i, s"part-$i"))) (6 to 10).map(i => Row(i, s"part-$i")))
checkAnswer( checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"), sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 100).map(i => Row(1, s"part-$i"))) (1 to 10).map(i => Row(1, s"part-$i")))
} }
test("create temporary orc table as") { test("create temporary orc table as") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100)) checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
checkAnswer( checkAnswer(
sql("SELECT * FROM normal_orc_source"), sql("SELECT * FROM normal_orc_source"),
(1 to 100).map(i => Row(i, s"part-$i"))) (1 to 10).map(i => Row(i, s"part-$i")))
checkAnswer( checkAnswer(
sql("SELECT * FROM normal_orc_source WHERE intField > 5"), sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
(6 to 100).map(i => Row(i, s"part-$i"))) (6 to 10).map(i => Row(i, s"part-$i")))
checkAnswer( checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"), sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 100).map(i => Row(1, s"part-$i"))) (1 to 10).map(i => Row(1, s"part-$i")))
} }
test("appending insert") { test("appending insert") {
...@@ -112,7 +106,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { ...@@ -112,7 +106,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
checkAnswer( checkAnswer(
sql("SELECT * FROM normal_orc_source"), sql("SELECT * FROM normal_orc_source"),
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i => (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
Seq.fill(2)(Row(i, s"part-$i")) Seq.fill(2)(Row(i, s"part-$i"))
}) })
} }
...@@ -125,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { ...@@ -125,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
checkAnswer( checkAnswer(
sql("SELECT * FROM normal_orc_as_source"), sql("SELECT * FROM normal_orc_as_source"),
(6 to 100).map(i => Row(i, s"part-$i"))) (6 to 10).map(i => Row(i, s"part-$i")))
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment