Skip to content
Snippets Groups Projects
Commit 00a2911c authored by Cheng Lian's avatar Cheng Lian Committed by Yin Huai
Browse files

[SPARK-10540] Fixes flaky all-data-type test

This PR breaks the original test case into multiple ones (one test case for each data type). In this way, test failure output can be much more readable.

Within each test case, we build a table with two columns, one of them is for the data type to test, the other is an "index" column, which is used to sort the DataFrame and workaround [SPARK-10591] [1]

[1]: https://issues.apache.org/jira/browse/SPARK-10591

Author: Cheng Lian <lian@databricks.com>

Closes #8768 from liancheng/spark-10540/test-all-data-types.
parent 35e8ab93
No related branches found
No related tags found
No related merge requests found
......@@ -100,80 +100,57 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}
ignore("test all data types") {
withTempPath { file =>
// Create the schema.
val struct =
StructType(
StructField("f1", FloatType, true) ::
StructField("f2", ArrayType(BooleanType), true) :: Nil)
// TODO: add CalendarIntervalType to here once we can save it out.
val dataTypes =
Seq(
StringType, BinaryType, NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
DateType, TimestampType,
ArrayType(IntegerType), MapType(StringType, LongType), struct,
new MyDenseVectorUDT())
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, nullable = true)
}
val schema = StructType(fields)
// Generate data at the driver side. We need to materialize the data first and then
// create RDD.
val maybeDataGenerator =
RandomDataGenerator.forType(
dataType = schema,
private val supportedDataTypes = Seq(
StringType, BinaryType,
NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
DateType, TimestampType,
ArrayType(IntegerType),
MapType(StringType, LongType),
new StructType()
.add("f1", FloatType, nullable = true)
.add("f2", ArrayType(BooleanType, containsNull = true), nullable = true),
new MyDenseVectorUDT()
).filter(supportsDataType)
for (dataType <- supportedDataTypes) {
test(s"test all data types - $dataType") {
withTempPath { file =>
val path = file.getCanonicalPath
val dataGenerator = RandomDataGenerator.forType(
dataType = dataType,
nullable = true,
seed = Some(System.nanoTime()))
val dataGenerator =
maybeDataGenerator
.getOrElse(fail(s"Failed to create data generator for schema $schema"))
val data = (1 to 10).map { i =>
dataGenerator.apply() match {
case row: Row => row
case null => Row.fromSeq(Seq.fill(schema.length)(null))
case other =>
fail(s"Row or null is expected to be generated, " +
s"but a ${other.getClass.getCanonicalName} is generated.")
seed = Some(System.nanoTime())
).getOrElse {
fail(s"Failed to create data generator for schema $dataType")
}
}
// Create a DF for the schema with random data.
val rdd = sqlContext.sparkContext.parallelize(data, 10)
val df = sqlContext.createDataFrame(rdd, schema)
// Create a DF for the schema with random data. The index field is used to sort the
// DataFrame. This is a workaround for SPARK-10591.
val schema = new StructType()
.add("index", IntegerType, nullable = false)
.add("col", dataType, nullable = true)
val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
// All columns that have supported data types of this source.
val supportedColumns = schema.fields.collect {
case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
}
val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)
val dfToBeSaved = df.selectExpr(selectedColumns: _*)
// Save the data out.
dfToBeSaved
.write
.format(dataSourceName)
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
.save(file.getCanonicalPath)
df.write
.mode("overwrite")
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.save(path)
val loadedDF =
sqlContext
val loadedDF = sqlContext
.read
.format(dataSourceName)
.schema(dfToBeSaved.schema)
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
.load(file.getCanonicalPath)
.selectExpr(selectedColumns: _*)
.option("dataSchema", df.schema.json)
.schema(df.schema)
.load(path)
.orderBy("index")
// Read the data back.
checkAnswer(
loadedDF,
dfToBeSaved
)
checkAnswer(loadedDF, df)
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment