From 939e4f3d8def16dfe03f0196be8e1c218a9daa32 Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@databricks.com> Date: Wed, 3 Jun 2015 13:57:57 -0700 Subject: [PATCH] [SPARK-8074] Parquet should throw AnalysisException during setup for data type/name related failures. Author: Reynold Xin <rxin@databricks.com> Closes #6608 from rxin/parquet-analysis and squashes the following commits: b5dc8e2 [Reynold Xin] Code review feedback. 5617cf6 [Reynold Xin] [SPARK-8074] Parquet should throw AnalysisException during setup for data type/name related failures. --- .../spark/sql/parquet/ParquetTypes.scala | 20 +++++++++---------- .../apache/spark/sql/parquet/newParquet.scala | 14 +++++++------ 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 6698b19c74..f8a5d84549 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.parquet import java.io.IOException -import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ import scala.util.Try import org.apache.hadoop.conf.Configuration @@ -33,12 +33,11 @@ import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeNa import parquet.schema.Type.Repetition import parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes} +import org.apache.spark.Logging +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.types._ -import org.apache.spark.{Logging, SparkException} -// Implicits -import scala.collection.JavaConversions._ /** A class representing Parquet info fields we care about, for passing back to Parquet */ private[parquet] case class ParquetTypeInfo( @@ -73,13 +72,12 @@ private[parquet] object ParquetTypesConverter extends Logging { case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType case ParquetPrimitiveTypeName.INT96 => // TODO: add BigInteger type? TODO(andre) use DecimalType instead???? - sys.error("Potential loss of precision: cannot convert INT96") + throw new AnalysisException("Potential loss of precision: cannot convert INT96") case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY if (originalType == ParquetOriginalType.DECIMAL && decimalInfo.getPrecision <= 18) => // TODO: for now, our reader only supports decimals that fit in a Long DecimalType(decimalInfo.getPrecision, decimalInfo.getScale) - case _ => sys.error( - s"Unsupported parquet datatype $parquetType") + case _ => throw new AnalysisException(s"Unsupported parquet datatype $parquetType") } } @@ -371,7 +369,7 @@ private[parquet] object ParquetTypesConverter extends Logging { parquetKeyType, parquetValueType) } - case _ => sys.error(s"Unsupported datatype $ctype") + case _ => throw new AnalysisException(s"Unsupported datatype $ctype") } } } @@ -403,7 +401,7 @@ private[parquet] object ParquetTypesConverter extends Logging { def convertFromString(string: String): Seq[Attribute] = { Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match { case s: StructType => s.toAttributes - case other => sys.error(s"Can convert $string to row") + case other => throw new AnalysisException(s"Can convert $string to row") } } @@ -411,8 +409,8 @@ private[parquet] object ParquetTypesConverter extends Logging { // ,;{}()\n\t= and space character are special characters in Parquet schema schema.map(_.name).foreach { name => if (name.matches(".*[ ,;{}()\n\t=].*")) { - sys.error( - s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\n\t=". + throw new AnalysisException( + s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=". |Please use alias to rename it. """.stripMargin.split("\n").mkString(" ")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 824ae36968..bf55e2383a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -39,6 +39,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD._ import org.apache.spark.rdd.RDD +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.{Row, SQLConf, SQLContext} @@ -83,7 +84,7 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext case partFilePattern(id) => id.toInt case name if name.startsWith("_") => 0 case name if name.startsWith(".") => 0 - case name => sys.error( + case name => throw new AnalysisException( s"Trying to write Parquet files to directory $outputPath, " + s"but found items with illegal name '$name'.") }.reduceOption(_ max _).getOrElse(0) @@ -380,11 +381,12 @@ private[sql] class ParquetRelation2( // time-consuming. if (dataSchema == null) { dataSchema = { - val dataSchema0 = - maybeDataSchema - .orElse(readSchema()) - .orElse(maybeMetastoreSchema) - .getOrElse(sys.error("Failed to get the schema.")) + val dataSchema0 = maybeDataSchema + .orElse(readSchema()) + .orElse(maybeMetastoreSchema) + .getOrElse(throw new AnalysisException( + s"Failed to discover schema of Parquet file(s) in the following location(s):\n" + + paths.mkString("\n\t"))) // If this Parquet relation is converted from a Hive Metastore table, must reconcile case // case insensitivity issue and possible schema mismatch (probably caused by schema -- GitLab