diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 8ffcef85668d622eb7f468e725e415742eca6f01..d7504936d90e5eccca595d6fbfdb9c01e158d8f5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -100,80 +100,57 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
     }
   }
 
-  ignore("test all data types") {
-    withTempPath { file =>
-      // Create the schema.
-      val struct =
-        StructType(
-          StructField("f1", FloatType, true) ::
-            StructField("f2", ArrayType(BooleanType), true) :: Nil)
-      // TODO: add CalendarIntervalType to here once we can save it out.
-      val dataTypes =
-        Seq(
-          StringType, BinaryType, NullType, BooleanType,
-          ByteType, ShortType, IntegerType, LongType,
-          FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
-          DateType, TimestampType,
-          ArrayType(IntegerType), MapType(StringType, LongType), struct,
-          new MyDenseVectorUDT())
-      val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
-        StructField(s"col$index", dataType, nullable = true)
-      }
-      val schema = StructType(fields)
-
-      // Generate data at the driver side. We need to materialize the data first and then
-      // create RDD.
-      val maybeDataGenerator =
-        RandomDataGenerator.forType(
-          dataType = schema,
+  private val supportedDataTypes = Seq(
+    StringType, BinaryType,
+    NullType, BooleanType,
+    ByteType, ShortType, IntegerType, LongType,
+    FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+    DateType, TimestampType,
+    ArrayType(IntegerType),
+    MapType(StringType, LongType),
+    new StructType()
+      .add("f1", FloatType, nullable = true)
+      .add("f2", ArrayType(BooleanType, containsNull = true), nullable = true),
+    new MyDenseVectorUDT()
+  ).filter(supportsDataType)
+
+  for (dataType <- supportedDataTypes) {
+    test(s"test all data types - $dataType") {
+      withTempPath { file =>
+        val path = file.getCanonicalPath
+
+        val dataGenerator = RandomDataGenerator.forType(
+          dataType = dataType,
           nullable = true,
-          seed = Some(System.nanoTime()))
-      val dataGenerator =
-        maybeDataGenerator
-          .getOrElse(fail(s"Failed to create data generator for schema $schema"))
-      val data = (1 to 10).map { i =>
-        dataGenerator.apply() match {
-          case row: Row => row
-          case null => Row.fromSeq(Seq.fill(schema.length)(null))
-          case other =>
-            fail(s"Row or null is expected to be generated, " +
-              s"but a ${other.getClass.getCanonicalName} is generated.")
+          seed = Some(System.nanoTime())
+        ).getOrElse {
+          fail(s"Failed to create data generator for schema $dataType")
         }
-      }
 
-      // Create a DF for the schema with random data.
-      val rdd = sqlContext.sparkContext.parallelize(data, 10)
-      val df = sqlContext.createDataFrame(rdd, schema)
+        // Create a DF for the schema with random data. The index field is used to sort the
+        // DataFrame.  This is a workaround for SPARK-10591.
+        val schema = new StructType()
+          .add("index", IntegerType, nullable = false)
+          .add("col", dataType, nullable = true)
+        val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+        val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
 
-      // All columns that have supported data types of this source.
-      val supportedColumns = schema.fields.collect {
-        case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
-      }
-      val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)
-
-      val dfToBeSaved = df.selectExpr(selectedColumns: _*)
-
-      // Save the data out.
-      dfToBeSaved
-        .write
-        .format(dataSourceName)
-        .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
-        .save(file.getCanonicalPath)
+        df.write
+          .mode("overwrite")
+          .format(dataSourceName)
+          .option("dataSchema", df.schema.json)
+          .save(path)
 
-      val loadedDF =
-        sqlContext
+        val loadedDF = sqlContext
           .read
           .format(dataSourceName)
-          .schema(dfToBeSaved.schema)
-          .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
-          .load(file.getCanonicalPath)
-          .selectExpr(selectedColumns: _*)
+          .option("dataSchema", df.schema.json)
+          .schema(df.schema)
+          .load(path)
+          .orderBy("index")
 
-      // Read the data back.
-      checkAnswer(
-        loadedDF,
-        dfToBeSaved
-      )
+        checkAnswer(loadedDF, df)
+      }
     }
   }