From a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Tue, 1 Nov 2016 23:37:03 -0700
Subject: [PATCH] [SPARK-18192] Support all file formats in structured
 streaming

## What changes were proposed in this pull request?
This patch adds support for all file formats in structured streaming sinks. This is actually a very small change thanks to all the previous refactoring done using the new internal commit protocol API.

## How was this patch tested?
Updated FileStreamSinkSuite to add test cases for json, text, and parquet.

Author: Reynold Xin <rxin@databricks.com>

Closes #15711 from rxin/SPARK-18192.
---
 .../execution/datasources/DataSource.scala    |  8 +--
 .../sql/streaming/FileStreamSinkSuite.scala   | 62 +++++++++----------
 2 files changed, 32 insertions(+), 38 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d980e6a15a..3f956c4276 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@@ -37,7 +36,6 @@ import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
@@ -292,7 +290,7 @@ case class DataSource(
       case s: StreamSinkProvider =>
         s.createSink(sparkSession.sqlContext, options, partitionColumns, outputMode)
 
-      case parquet: parquet.ParquetFileFormat =>
+      case fileFormat: FileFormat =>
         val caseInsensitiveOptions = new CaseInsensitiveMap(options)
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
@@ -301,7 +299,7 @@ case class DataSource(
           throw new IllegalArgumentException(
             s"Data source $className does not support $outputMode output mode")
         }
-        new FileStreamSink(sparkSession, path, parquet, partitionColumns, options)
+        new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, options)
 
       case _ =>
         throw new UnsupportedOperationException(
@@ -516,7 +514,7 @@ case class DataSource(
           val plan = data.logicalPlan
           plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
             throw new AnalysisException(
-              s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]")
+              s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
           }.asInstanceOf[Attribute]
         }
         // For partitioned relation r, r.schema's column ordering can be different from the column
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 902cf05344..0f140f94f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
-import org.apache.spark.sql._
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
@@ -142,42 +142,38 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 
-  test("FileStreamSink - supported formats") {
-    def testFormat(format: Option[String]): Unit = {
-      val inputData = MemoryStream[Int]
-      val ds = inputData.toDS()
+  test("FileStreamSink - parquet") {
+    testFormat(None) // should not throw error as default format parquet when not specified
+    testFormat(Some("parquet"))
+  }
 
-      val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
-      val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+  test("FileStreamSink - text") {
+    testFormat(Some("text"))
+  }
 
-      var query: StreamingQuery = null
+  test("FileStreamSink - json") {
+    testFormat(Some("text"))
+  }
 
-      try {
-        val writer =
-          ds.map(i => (i, i * 1000))
-            .toDF("id", "value")
-            .writeStream
-        if (format.nonEmpty) {
-          writer.format(format.get)
-        }
-        query = writer
-            .option("checkpointLocation", checkpointDir)
-            .start(outputDir)
-      } finally {
-        if (query != null) {
-          query.stop()
-        }
-      }
-    }
+  def testFormat(format: Option[String]): Unit = {
+    val inputData = MemoryStream[Int]
+    val ds = inputData.toDS()
 
-    testFormat(None) // should not throw error as default format parquet when not specified
-    testFormat(Some("parquet"))
-    val e = intercept[UnsupportedOperationException] {
-      testFormat(Some("text"))
-    }
-    Seq("text", "not support", "stream").foreach { s =>
-      assert(e.getMessage.contains(s))
+    val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+    val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+    var query: StreamingQuery = null
+
+    try {
+      val writer = ds.map(i => (i, i * 1000)).toDF("id", "value").writeStream
+      if (format.nonEmpty) {
+        writer.format(format.get)
+      }
+      query = writer.option("checkpointLocation", checkpointDir).start(outputDir)
+    } finally {
+      if (query != null) {
+        query.stop()
+      }
     }
   }
-
 }
-- 
GitLab