Skip to content
Snippets Groups Projects
Commit 6c4fdbec authored by Yijie Shen's avatar Yijie Shen Committed by Reynold Xin
Browse files

[SPARK-8887] [SQL] Explicit define which data types can be used as dynamic partition columns

This PR enforce dynamic partition column data type requirements by adding analysis rules.

JIRA: https://issues.apache.org/jira/browse/SPARK-8887

Author: Yijie Shen <henry.yijieshen@gmail.com>

Closes #8201 from yjshen/dynamic_partition_columns.
parent ec29f203
No related branches found
No related tags found
No related merge requests found
......@@ -26,6 +26,7 @@ import scala.util.Try
import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.Shell
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.types._
......@@ -270,6 +271,18 @@ private[sql] object PartitioningUtils {
private val upCastingOrder: Seq[DataType] =
Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
def validatePartitionColumnDataTypes(
schema: StructType,
partitionColumns: Array[String]): Unit = {
ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns).foreach { field =>
field.dataType match {
case _: AtomicType => // OK
case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
}
}
}
/**
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
......
......@@ -143,7 +143,7 @@ object ResolvedDataSource extends Logging {
new ResolvedDataSource(clazz, relation)
}
private def partitionColumnsSchema(
def partitionColumnsSchema(
schema: StructType,
partitionColumns: Array[String]): StructType = {
StructType(partitionColumns.map { col =>
......@@ -179,6 +179,9 @@ object ResolvedDataSource extends Logging {
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
PartitioningUtils.validatePartitionColumnDataTypes(data.schema, partitionColumns)
val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
val r = dataSource.createRelation(
sqlContext,
......
......@@ -287,7 +287,7 @@ private[sql] class DynamicPartitionWriterContainer(
PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType))
val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
val partitionName = Literal(c.name + "=") :: str :: Nil
if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
}
// Returns the partition path given a partition key.
......
......@@ -116,6 +116,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// OK
}
PartitioningUtils.validatePartitionColumnDataTypes(r.schema, part.keySet.toArray)
// Get all input data source relations of the query.
val srcRelations = query.collect {
case LogicalRelation(src: BaseRelation) => src
......@@ -138,10 +140,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// OK
}
case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
case CreateTableUsingAsSelect(tableName, _, _, partitionColumns, mode, _, query) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
if (catalog.tableExists(Seq(tableName))) {
if (mode == SaveMode.Overwrite && catalog.tableExists(Seq(tableName))) {
// Need to remove SubQuery operator.
EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
// Only do the check if the table is a data source table
......@@ -164,6 +166,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// OK
}
PartitioningUtils.validatePartitionColumnDataTypes(query.schema, partitionColumns)
case _ => // OK
}
}
......
......@@ -17,6 +17,8 @@
package org.apache.spark.sql.sources
import java.sql.Date
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
......@@ -553,6 +555,21 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
}
test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") {
val df = Seq(
(1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),
(2, "v2", Array(4, 5, 6), Map("k2" -> "v2"), Tuple2(2, "5")),
(3, "v3", Array(7, 8, 9), Map("k3" -> "v3"), Tuple2(3, "6"))).toDF("a", "b", "c", "d", "e")
withTempDir { file =>
intercept[AnalysisException] {
df.write.format(dataSourceName).partitionBy("c", "d", "e").save(file.getCanonicalPath)
}
}
intercept[AnalysisException] {
df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
}
}
}
// This class is used to test SPARK-8578. We should not use any custom output committer when
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment