Skip to content
Snippets Groups Projects
Commit 09a06418 authored by animesh's avatar animesh Committed by Michael Armbrust
Browse files

[SPARK-8072] [SQL] Better AnalysisException for writing DataFrame with identically named columns

Adding a function checkConstraints which will check for the constraints to be applied on the dataframe / dataframe schema. Function called before storing the dataframe to an external storage. Function added in the corresponding datasource API.
cc rxin marmbrus

Author: animesh <animesh@apache.spark>

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>

Closes #7013 from animeshbaranawal/8072 and squashes the following commits:

f70dd0e [animesh] Change IO exception to Analysis Exception
fd45e1b [animesh] 8072: Fix Style Issues
a8a964f [animesh] 8072: Improving on previous commits
3cc4d2c [animesh] Fix Style Issues
1a89115 [animesh] Fix Style Issues
98b4399 [animesh] 8072 : Moved the exception handling to ResolvedDataSource specific to parquet format
7c3d928 [animesh] 8072: Adding check to DataFrameWriter.scala
parent 7b467cc9
No related branches found
No related tags found
No related merge requests found
......@@ -22,6 +22,7 @@ import java.io.IOException
import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
......@@ -37,6 +38,17 @@ private[sql] class DefaultSource
parameters.getOrElse("path", sys.error("'path' must be specified for json data."))
}
/** Constraints to be imposed on dataframe to be stored. */
private def checkConstraints(data: DataFrame): Unit = {
if (data.schema.fieldNames.length != data.schema.fieldNames.distinct.length) {
val duplicateColumns = data.schema.fieldNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}.mkString(", ")
throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
s"cannot save to JSON format")
}
}
/** Returns a new base relation with the parameters. */
override def createRelation(
sqlContext: SQLContext,
......@@ -63,6 +75,10 @@ private[sql] class DefaultSource
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
// check if dataframe satisfies the constraints
// before moving forward
checkConstraints(data)
val path = checkPath(parameters)
val filesystemPath = new Path(path)
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
......@@ -130,6 +146,17 @@ private[sql] class JSONRelation(
samplingRatio,
userSpecifiedSchema)(sqlContext)
/** Constraints to be imposed on dataframe to be stored. */
private def checkConstraints(data: DataFrame): Unit = {
if (data.schema.fieldNames.length != data.schema.fieldNames.distinct.length) {
val duplicateColumns = data.schema.fieldNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}.mkString(", ")
throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
s"cannot save to JSON format")
}
}
private val useJacksonStreamingAPI: Boolean = sqlContext.conf.useJacksonStreamingAPI
override val needConversion: Boolean = false
......@@ -178,6 +205,10 @@ private[sql] class JSONRelation(
}
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
// check if dataframe satisfies constraints
// before moving forward
checkConstraints(data)
val filesystemPath = path match {
case Some(p) => new Path(p)
case None =>
......
......@@ -164,7 +164,24 @@ private[sql] class ParquetRelation2(
}
}
override def dataSchema: StructType = maybeDataSchema.getOrElse(metadataCache.dataSchema)
/** Constraints on schema of dataframe to be stored. */
private def checkConstraints(schema: StructType): Unit = {
if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}.mkString(", ")
throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
s"cannot save to parquet format")
}
}
override def dataSchema: StructType = {
val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema)
// check if schema satisfies the constraints
// before moving forward
checkConstraints(schema)
schema
}
override private[sql] def refresh(): Unit = {
super.refresh()
......
......@@ -737,4 +737,28 @@ class DataFrameSuite extends QueryTest {
df.col("")
df.col("t.``")
}
test("SPARK-8072: Better Exception for Duplicate Columns") {
// only one duplicate column present
val e = intercept[org.apache.spark.sql.AnalysisException] {
val df1 = Seq((1, 2, 3), (2, 3, 4), (3, 4, 5)).toDF("column1", "column2", "column1")
.write.format("parquet").save("temp")
}
assert(e.getMessage.contains("Duplicate column(s)"))
assert(e.getMessage.contains("parquet"))
assert(e.getMessage.contains("column1"))
assert(!e.getMessage.contains("column2"))
// multiple duplicate columns present
val f = intercept[org.apache.spark.sql.AnalysisException] {
val df2 = Seq((1, 2, 3, 4, 5), (2, 3, 4, 5, 6), (3, 4, 5, 6, 7))
.toDF("column1", "column2", "column3", "column1", "column3")
.write.format("json").save("temp")
}
assert(f.getMessage.contains("Duplicate column(s)"))
assert(f.getMessage.contains("JSON"))
assert(f.getMessage.contains("column1"))
assert(f.getMessage.contains("column3"))
assert(!f.getMessage.contains("column2"))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment