diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c39017ebbfe60cb16c113b341c18985167778a72..cc0cbba275b81b0a77248dc6571c9c64c31bac44 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -132,10 +132,10 @@ case class CatalogTablePartition( /** * Given the partition schema, returns a row with that schema holding the partition values. */ - def toRow(partitionSchema: StructType, defaultTimeZoneId: String): InternalRow = { + def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = { val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties) val timeZoneId = caseInsensitiveProperties.getOrElse( - DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId) + DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId) InternalRow.fromSeq(partitionSchema.map { field => val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) { null diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index bf596fa0a89db553f1bd6f1452b97b770a1800e3..6c1592fd8881dfc85be94a7b1242241da1035f1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -498,11 +498,6 @@ object DateTimeUtils { false } - lazy val validTimezones = TimeZone.getAvailableIDs().toSet - def isValidTimezone(timezoneId: String): Boolean = { - validTimezones.contains(timezoneId) - } - /** * Returns the microseconds since year zero (-17999) from microseconds since epoch. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index dabbc2b6387e4c32e9c9ea86a2c5e1ac6d91453b..9d641b528723a6dda5d1fe6b343b6e414d53042d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -18,9 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.io.IOException; -import java.util.TimeZone; -import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; @@ -32,7 +30,6 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.execution.vectorized.ColumnVector; -import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DecimalType; @@ -93,30 +90,11 @@ public class VectorizedColumnReader { private final PageReader pageReader; private final ColumnDescriptor descriptor; - private final TimeZone storageTz; - private final TimeZone sessionTz; - public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader, - Configuration conf) + public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader) throws IOException { this.descriptor = descriptor; this.pageReader = pageReader; - // If the table has a timezone property, apply the correct conversions. See SPARK-12297. - // The conf is sometimes null in tests. - String sessionTzString = - conf == null ? null : conf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key()); - if (sessionTzString == null || sessionTzString.isEmpty()) { - sessionTz = DateTimeUtils.defaultTimeZone(); - } else { - sessionTz = TimeZone.getTimeZone(sessionTzString); - } - String storageTzString = - conf == null ? null : conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY()); - if (storageTzString == null || storageTzString.isEmpty()) { - storageTz = sessionTz; - } else { - storageTz = TimeZone.getTimeZone(storageTzString); - } this.maxDefLevel = descriptor.getMaxDefinitionLevel(); DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); @@ -311,7 +289,7 @@ public class VectorizedColumnReader { // TODO: Convert dictionary of Binaries to dictionary of Longs if (!column.isNullAt(i)) { Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v, sessionTz, storageTz)); + column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); } } } else { @@ -444,7 +422,7 @@ public class VectorizedColumnReader { if (defColumn.readInteger() == maxDefLevel) { column.putLong(rowId + i, // Read 12 bytes for INT96 - ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12), sessionTz, storageTz)); + ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12))); } else { column.putNull(rowId + i); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index d8974ddf247041a939ab728039931e9e251f0a2a..51bdf0f0f2291878bc7eeb37ce5bc15ecbe01056 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ColumnDescriptor; @@ -96,8 +95,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private boolean returnColumnarBatch; - private Configuration conf; - /** * The default config on whether columnarBatch should be offheap. */ @@ -110,7 +107,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException, UnsupportedOperationException { super.initialize(inputSplit, taskAttemptContext); - this.conf = taskAttemptContext.getConfiguration(); initializeInternal(); } @@ -281,7 +277,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; columnReaders[i] = new VectorizedColumnReader(columns.get(i), - pages.getPageReader(columns.get(i)), conf); + pages.getPageReader(columns.get(i))); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 5843c5b56d44cbbaa9bf1ede06f051df1a51c3fd..ebf03e1bf8869753add213d1cda89268c04c713f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import scala.util.Try +import org.apache.commons.lang3.StringEscapeUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -36,7 +37,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils} import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -73,10 +74,6 @@ case class CreateTableLikeCommand( sourceTableDesc.provider } - val properties = sourceTableDesc.properties.filter { case (k, _) => - k == ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY - } - // If the location is specified, we create an external table internally. // Otherwise create a managed table. val tblType = if (location.isEmpty) CatalogTableType.MANAGED else CatalogTableType.EXTERNAL @@ -89,7 +86,6 @@ case class CreateTableLikeCommand( locationUri = location.map(CatalogUtils.stringToURI(_))), schema = sourceTableDesc.schema, provider = newProvider, - properties = properties, partitionColumnNames = sourceTableDesc.partitionColumnNames, bucketSpec = sourceTableDesc.bucketSpec) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 8113768cd793f99051d774fdead3ed0e10c12b20..2f3a2c62b912c1f245fa7ceca996294ece134347 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -632,6 +632,4 @@ object ParquetFileFormat extends Logging { Failure(cause) }.toOption } - - val PARQUET_TIMEZONE_TABLE_PROPERTY = "parquet.mr.int96.write.zone" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index bf395a0bef745e500aedcb3e1d7cd0df1954571d..f1a35dd8a6200fb4c14069dd75e629d3707d08a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -95,8 +95,7 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo new ParquetRecordMaterializer( parquetRequestedSchema, ParquetReadSupport.expandUDT(catalystRequestedSchema), - new ParquetSchemaConverter(conf), - conf) + new ParquetSchemaConverter(conf)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index df041996cdea9fa387f0942ee216709566073915..4e49a0dac97c0e00e96c46392cbf71dcd2e48fe2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import org.apache.hadoop.conf.Configuration import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} import org.apache.parquet.schema.MessageType @@ -30,17 +29,13 @@ import org.apache.spark.sql.types.StructType * @param parquetSchema Parquet schema of the records to be read * @param catalystSchema Catalyst schema of the rows to be constructed * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters - * @param hadoopConf hadoop Configuration for passing extra params for parquet conversion */ private[parquet] class ParquetRecordMaterializer( - parquetSchema: MessageType, - catalystSchema: StructType, - schemaConverter: ParquetSchemaConverter, - hadoopConf: Configuration) + parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter) extends RecordMaterializer[UnsafeRow] { private val rootConverter = - new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, hadoopConf, NoopUpdater) + new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater) override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index d52ff62d93b26c90c6ec57818bc94ecb31baf5ae..32e6c60cd9766324f378c9cce950774480abc60d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -19,12 +19,10 @@ package org.apache.spark.sql.execution.datasources.parquet import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder -import java.util.TimeZone import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.hadoop.conf.Configuration import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type} @@ -36,7 +34,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -120,14 +117,12 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined * types should have been expanded. - * @param hadoopConf a hadoop Configuration for passing any extra parameters for parquet conversion * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( schemaConverter: ParquetSchemaConverter, parquetType: GroupType, catalystType: StructType, - hadoopConf: Configuration, updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -266,18 +261,18 @@ private[parquet] class ParquetRowConverter( case TimestampType => // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. - // If the table has a timezone property, apply the correct conversions. See SPARK-12297. - val sessionTzString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key) - val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_)) - .getOrElse(DateTimeUtils.defaultTimeZone()) - val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) - val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz) new ParquetPrimitiveConverter(updater) { // Converts nanosecond timestamps stored as INT96 override def addBinary(value: Binary): Unit = { - val timestamp = ParquetRowConverter.binaryToSQLTimestamp(value, sessionTz = sessionTz, - storageTz = storageTz) - updater.setLong(timestamp) + assert( + value.length() == 12, + "Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " + + s"but got a ${value.length()}-byte binary.") + + val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) + val timeOfDayNanos = buf.getLong + val julianDay = buf.getInt + updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)) } } @@ -307,7 +302,7 @@ private[parquet] class ParquetRowConverter( case t: StructType => new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, hadoopConf, new ParentContainerUpdater { + schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater { override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) @@ -656,7 +651,6 @@ private[parquet] class ParquetRowConverter( } private[parquet] object ParquetRowConverter { - def binaryToUnscaledLong(binary: Binary): Long = { // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without @@ -679,35 +673,12 @@ private[parquet] object ParquetRowConverter { unscaled } - /** - * Converts an int96 to a SQLTimestamp, given both the storage timezone and the local timezone. - * The timestamp is really meant to be interpreted as a "floating time", but since we - * actually store it as micros since epoch, why we have to apply a conversion when timezones - * change. - * - * @param binary a parquet Binary which holds one int96 - * @param sessionTz the session timezone. This will be used to determine how to display the time, - * and compute functions on the timestamp which involve a timezone, eg. extract - * the hour. - * @param storageTz the timezone which was used to store the timestamp. This should come from the - * timestamp table property, or else assume its the same as the sessionTz - * @return a timestamp (millis since epoch) which will render correctly in the sessionTz - */ - def binaryToSQLTimestamp( - binary: Binary, - sessionTz: TimeZone, - storageTz: TimeZone): SQLTimestamp = { + def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = { assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" + s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.") val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) val timeOfDayNanos = buffer.getLong val julianDay = buffer.getInt - val utcEpochMicros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) - // avoid expensive time logic if possible. - if (sessionTz.getID() != storageTz.getID()) { - DateTimeUtils.convertTz(utcEpochMicros, sessionTz, storageTz) - } else { - utcEpochMicros - } + DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 679ed8e361b74f005fe3b9428f30f8d809454b1b..38b0e33937f3cee2135b1bba0da1c9426958aa28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.{ByteBuffer, ByteOrder} import java.util -import java.util.TimeZone import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -76,9 +75,6 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit // Reusable byte array used to write decimal values private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION)) - private var storageTz: TimeZone = _ - private var sessionTz: TimeZone = _ - override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) this.schema = StructType.fromString(schemaString) @@ -95,19 +91,6 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit this.rootFieldWriters = schema.map(_.dataType).map(makeWriter) - // If the table has a timezone property, apply the correct conversions. See SPARK-12297. - val sessionTzString = configuration.get(SQLConf.SESSION_LOCAL_TIMEZONE.key) - sessionTz = if (sessionTzString == null || sessionTzString == "") { - TimeZone.getDefault() - } else { - TimeZone.getTimeZone(sessionTzString) - } - val storageTzString = configuration.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) - storageTz = if (storageTzString == null || storageTzString == "") { - sessionTz - } else { - TimeZone.getTimeZone(storageTzString) - } val messageType = new ParquetSchemaConverter(configuration).convert(schema) val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava @@ -195,13 +178,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond // precision. Nanosecond parts of timestamp values read from INT96 are simply stripped. - val rawMicros = row.getLong(ordinal) - val adjustedMicros = if (sessionTz.getID() == storageTz.getID()) { - rawMicros - } else { - DateTimeUtils.convertTz(rawMicros, storageTz, sessionTz) - } - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(adjustedMicros) + val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) val buf = ByteBuffer.wrap(timestampBuffer) buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 8fef467f5f5cbac1e3f4d40e1bc697f1f231182a..ba48facff2933c3452eec5d0104476a2cf5929ff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -39,10 +39,9 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ColumnStat -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ @@ -225,14 +224,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat throw new TableAlreadyExistsException(db = db, table = table) } - val tableTz = tableDefinition.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) - tableTz.foreach { tz => - if (!DateTimeUtils.isValidTimezone(tz)) { - throw new AnalysisException(s"Cannot set" + - s" ${ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY} to invalid timezone $tz") - } - } - if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e0b565c0d79a07d606c0423de7bbc48259dc905a..6b98066cb76c826d6f27b71b53fa64f17c2841c5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._ import org.apache.spark.sql.types._ @@ -175,7 +174,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // We don't support hive bucketed tables, only ones we write out. bucketSpec = None, fileFormat = fileFormat, - options = options ++ getStorageTzOptions(relation))(sparkSession = sparkSession) + options = options)(sparkSession = sparkSession) val created = LogicalRelation(fsRelation, updatedTable) tableRelationCache.put(tableIdentifier, created) created @@ -202,7 +201,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log userSpecifiedSchema = Option(dataSchema), // We don't support hive bucketed tables, only ones we write out. bucketSpec = None, - options = options ++ getStorageTzOptions(relation), + options = options, className = fileType).resolveRelation(), table = updatedTable) @@ -223,13 +222,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log result.copy(output = newOutput) } - private def getStorageTzOptions(relation: CatalogRelation): Map[String, String] = { - // We add the table timezone to the relation options, which automatically gets injected into the - // hadoopConf for the Parquet Converters - val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY - relation.tableMeta.properties.get(storageTzKey).map(storageTzKey -> _).toMap - } - private def inferIfNeeded( relation: CatalogRelation, options: Map[String, String], diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 2bfd63d9b56e6e301a0db209d669a659f60bea99..05b6059472f59bc4f65b89375f077a4d43d5d0ef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -17,22 +17,12 @@ package org.apache.spark.sql.hive -import java.io.File -import java.net.URLDecoder import java.sql.Timestamp -import java.util.TimeZone -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.parquet.hadoop.ParquetFileReader -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName - -import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompatibilityTest, ParquetFileFormat} -import org.apache.spark.sql.functions._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StringType, StructType, TimestampType} class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton { /** @@ -151,369 +141,4 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi Row(Seq(Row(1))), "ARRAY<STRUCT<array_element: INT>>") } - - val testTimezones = Seq( - "UTC" -> "UTC", - "LA" -> "America/Los_Angeles", - "Berlin" -> "Europe/Berlin" - ) - // Check creating parquet tables with timestamps, writing data into them, and reading it back out - // under a variety of conditions: - // * tables with explicit tz and those without - // * altering table properties directly - // * variety of timezones, local & non-local - val sessionTimezones = testTimezones.map(_._2).map(Some(_)) ++ Seq(None) - sessionTimezones.foreach { sessionTzOpt => - val sparkSession = spark.newSession() - sessionTzOpt.foreach { tz => sparkSession.conf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, tz) } - testCreateWriteRead(sparkSession, "no_tz", None, sessionTzOpt) - val localTz = TimeZone.getDefault.getID() - testCreateWriteRead(sparkSession, "local", Some(localTz), sessionTzOpt) - // check with a variety of timezones. The unit tests currently are configured to always use - // America/Los_Angeles, but even if they didn't, we'd be sure to cover a non-local timezone. - testTimezones.foreach { case (tableName, zone) => - if (zone != localTz) { - testCreateWriteRead(sparkSession, tableName, Some(zone), sessionTzOpt) - } - } - } - - private def testCreateWriteRead( - sparkSession: SparkSession, - baseTable: String, - explicitTz: Option[String], - sessionTzOpt: Option[String]): Unit = { - testCreateAlterTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt) - testWriteTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt) - testReadTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt) - } - - private def checkHasTz(spark: SparkSession, table: String, tz: Option[String]): Unit = { - val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table)) - assert(tableMetadata.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) === tz) - } - - private def testCreateAlterTablesWithTimezone( - spark: SparkSession, - baseTable: String, - explicitTz: Option[String], - sessionTzOpt: Option[String]): Unit = { - test(s"SPARK-12297: Create and Alter Parquet tables and timezones; explicitTz = $explicitTz; " + - s"sessionTzOpt = $sessionTzOpt") { - val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY - withTable(baseTable, s"like_$baseTable", s"select_$baseTable", s"partitioned_$baseTable") { - // If we ever add a property to set the table timezone by default, defaultTz would change - val defaultTz = None - // check that created tables have correct TBLPROPERTIES - val tblProperties = explicitTz.map { - tz => s"""TBLPROPERTIES ($key="$tz")""" - }.getOrElse("") - spark.sql( - s"""CREATE TABLE $baseTable ( - | x int - | ) - | STORED AS PARQUET - | $tblProperties - """.stripMargin) - val expectedTableTz = explicitTz.orElse(defaultTz) - checkHasTz(spark, baseTable, expectedTableTz) - spark.sql( - s"""CREATE TABLE partitioned_$baseTable ( - | x int - | ) - | PARTITIONED BY (y int) - | STORED AS PARQUET - | $tblProperties - """.stripMargin) - checkHasTz(spark, s"partitioned_$baseTable", expectedTableTz) - spark.sql(s"CREATE TABLE like_$baseTable LIKE $baseTable") - checkHasTz(spark, s"like_$baseTable", expectedTableTz) - spark.sql( - s"""CREATE TABLE select_$baseTable - | STORED AS PARQUET - | AS - | SELECT * from $baseTable - """.stripMargin) - checkHasTz(spark, s"select_$baseTable", defaultTz) - - // check alter table, setting, unsetting, resetting the property - spark.sql( - s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""") - checkHasTz(spark, baseTable, Some("America/Los_Angeles")) - spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""") - checkHasTz(spark, baseTable, Some("UTC")) - spark.sql(s"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""") - checkHasTz(spark, baseTable, None) - explicitTz.foreach { tz => - spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""") - checkHasTz(spark, baseTable, expectedTableTz) - } - } - } - } - - val desiredTimestampStrings = Seq( - "2015-12-31 22:49:59.123", - "2015-12-31 23:50:59.123", - "2016-01-01 00:39:59.123", - "2016-01-01 01:29:59.123" - ) - // We don't want to mess with timezones inside the tests themselves, since we use a shared - // spark context, and then we might be prone to issues from lazy vals for timezones. Instead, - // we manually adjust the timezone just to determine what the desired millis (since epoch, in utc) - // is for various "wall-clock" times in different timezones, and then we can compare against those - // in our tests. - val timestampTimezoneToMillis = { - val originalTz = TimeZone.getDefault - try { - desiredTimestampStrings.flatMap { timestampString => - Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map { tzId => - TimeZone.setDefault(TimeZone.getTimeZone(tzId)) - val timestamp = Timestamp.valueOf(timestampString) - (timestampString, tzId) -> timestamp.getTime() - } - }.toMap - } finally { - TimeZone.setDefault(originalTz) - } - } - - private def createRawData(spark: SparkSession): Dataset[(String, Timestamp)] = { - import spark.implicits._ - val df = desiredTimestampStrings.toDF("display") - // this will get the millis corresponding to the display time given the current *session* - // timezone. - df.withColumn("ts", expr("cast(display as timestamp)")).as[(String, Timestamp)] - } - - private def testWriteTablesWithTimezone( - spark: SparkSession, - baseTable: String, - explicitTz: Option[String], - sessionTzOpt: Option[String]) : Unit = { - val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY - test(s"SPARK-12297: Write to Parquet tables with Timestamps; explicitTz = $explicitTz; " + - s"sessionTzOpt = $sessionTzOpt") { - - withTable(s"saveAsTable_$baseTable", s"insert_$baseTable", s"partitioned_ts_$baseTable") { - val sessionTzId = sessionTzOpt.getOrElse(TimeZone.getDefault().getID()) - // check that created tables have correct TBLPROPERTIES - val tblProperties = explicitTz.map { - tz => s"""TBLPROPERTIES ($key="$tz")""" - }.getOrElse("") - - val rawData = createRawData(spark) - // Check writing data out. - // We write data into our tables, and then check the raw parquet files to see whether - // the correct conversion was applied. - rawData.write.saveAsTable(s"saveAsTable_$baseTable") - checkHasTz(spark, s"saveAsTable_$baseTable", None) - spark.sql( - s"""CREATE TABLE insert_$baseTable ( - | display string, - | ts timestamp - | ) - | STORED AS PARQUET - | $tblProperties - """.stripMargin) - checkHasTz(spark, s"insert_$baseTable", explicitTz) - rawData.write.insertInto(s"insert_$baseTable") - // no matter what, roundtripping via the table should leave the data unchanged - val readFromTable = spark.table(s"insert_$baseTable").collect() - .map { row => (row.getAs[String](0), row.getAs[Timestamp](1)).toString() }.sorted - assert(readFromTable === rawData.collect().map(_.toString()).sorted) - - // Now we load the raw parquet data on disk, and check if it was adjusted correctly. - // Note that we only store the timezone in the table property, so when we read the - // data this way, we're bypassing all of the conversion logic, and reading the raw - // values in the parquet file. - val onDiskLocation = spark.sessionState.catalog - .getTableMetadata(TableIdentifier(s"insert_$baseTable")).location.getPath - // we test reading the data back with and without the vectorized reader, to make sure we - // haven't broken reading parquet from non-hive tables, with both readers. - Seq(false, true).foreach { vectorized => - spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized) - val readFromDisk = spark.read.parquet(onDiskLocation).collect() - val storageTzId = explicitTz.getOrElse(sessionTzId) - readFromDisk.foreach { row => - val displayTime = row.getAs[String](0) - val millis = row.getAs[Timestamp](1).getTime() - val expectedMillis = timestampTimezoneToMillis((displayTime, storageTzId)) - assert(expectedMillis === millis, s"Display time '$displayTime' was stored " + - s"incorrectly with sessionTz = ${sessionTzOpt}; Got $millis, expected " + - s"$expectedMillis (delta = ${millis - expectedMillis})") - } - } - - // check tables partitioned by timestamps. We don't compare the "raw" data in this case, - // since they are adjusted even when we bypass the hive table. - rawData.write.partitionBy("ts").saveAsTable(s"partitioned_ts_$baseTable") - val partitionDiskLocation = spark.sessionState.catalog - .getTableMetadata(TableIdentifier(s"partitioned_ts_$baseTable")).location.getPath - // no matter what mix of timezones we use, the dirs should specify the value with the - // same time we use for display. - val parts = new File(partitionDiskLocation).list().collect { - case name if name.startsWith("ts=") => URLDecoder.decode(name.stripPrefix("ts=")) - }.toSet - assert(parts === desiredTimestampStrings.toSet) - } - } - } - - private def testReadTablesWithTimezone( - spark: SparkSession, - baseTable: String, - explicitTz: Option[String], - sessionTzOpt: Option[String]): Unit = { - val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY - test(s"SPARK-12297: Read from Parquet tables with Timestamps; explicitTz = $explicitTz; " + - s"sessionTzOpt = $sessionTzOpt") { - withTable(s"external_$baseTable", s"partitioned_$baseTable") { - // we intentionally save this data directly, without creating a table, so we can - // see that the data is read back differently depending on table properties. - // we'll save with adjusted millis, so that it should be the correct millis after reading - // back. - val rawData = createRawData(spark) - // to avoid closing over entire class - val timestampTimezoneToMillis = this.timestampTimezoneToMillis - import spark.implicits._ - val adjustedRawData = (explicitTz match { - case Some(tzId) => - rawData.map { case (displayTime, _) => - val storageMillis = timestampTimezoneToMillis((displayTime, tzId)) - (displayTime, new Timestamp(storageMillis)) - } - case _ => - rawData - }).withColumnRenamed("_1", "display").withColumnRenamed("_2", "ts") - withTempPath { basePath => - val unpartitionedPath = new File(basePath, "flat") - val partitionedPath = new File(basePath, "partitioned") - adjustedRawData.write.parquet(unpartitionedPath.getCanonicalPath) - val options = Map("path" -> unpartitionedPath.getCanonicalPath) ++ - explicitTz.map { tz => Map(key -> tz) }.getOrElse(Map()) - - spark.catalog.createTable( - tableName = s"external_$baseTable", - source = "parquet", - schema = new StructType().add("display", StringType).add("ts", TimestampType), - options = options - ) - - // also write out a partitioned table, to make sure we can access that correctly. - // add a column we can partition by (value doesn't particularly matter). - val partitionedData = adjustedRawData.withColumn("id", monotonicallyIncreasingId) - partitionedData.write.partitionBy("id") - .parquet(partitionedPath.getCanonicalPath) - // unfortunately, catalog.createTable() doesn't let us specify partitioning, so just use - // a "CREATE TABLE" stmt. - val tblOpts = explicitTz.map { tz => s"""TBLPROPERTIES ($key="$tz")""" }.getOrElse("") - spark.sql(s"""CREATE EXTERNAL TABLE partitioned_$baseTable ( - | display string, - | ts timestamp - |) - |PARTITIONED BY (id bigint) - |STORED AS parquet - |LOCATION 'file:${partitionedPath.getCanonicalPath}' - |$tblOpts - """.stripMargin) - spark.sql(s"msck repair table partitioned_$baseTable") - - for { - vectorized <- Seq(false, true) - partitioned <- Seq(false, true) - } { - withClue(s"vectorized = $vectorized; partitioned = $partitioned") { - spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized) - val sessionTz = sessionTzOpt.getOrElse(TimeZone.getDefault().getID()) - val table = if (partitioned) s"partitioned_$baseTable" else s"external_$baseTable" - val query = s"select display, cast(ts as string) as ts_as_string, ts " + - s"from $table" - val collectedFromExternal = spark.sql(query).collect() - assert( collectedFromExternal.size === 4) - collectedFromExternal.foreach { row => - val displayTime = row.getAs[String](0) - // the timestamp should still display the same, despite the changes in timezones - assert(displayTime === row.getAs[String](1).toString()) - // we'll also check that the millis behind the timestamp has the appropriate - // adjustments. - val millis = row.getAs[Timestamp](2).getTime() - val expectedMillis = timestampTimezoneToMillis((displayTime, sessionTz)) - val delta = millis - expectedMillis - val deltaHours = delta / (1000L * 60 * 60) - assert(millis === expectedMillis, s"Display time '$displayTime' did not have " + - s"correct millis: was $millis, expected $expectedMillis; delta = $delta " + - s"($deltaHours hours)") - } - - // Now test that the behavior is still correct even with a filter which could get - // pushed down into parquet. We don't need extra handling for pushed down - // predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet - // does not read statistics from int96 fields, as they are unsigned. See - // scalastyle:off line.size.limit - // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419 - // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348 - // scalastyle:on line.size.limit - // - // Just to be defensive in case anything ever changes in parquet, this test checks - // the assumption on column stats, and also the end-to-end behavior. - - val hadoopConf = sparkContext.hadoopConfiguration - val fs = FileSystem.get(hadoopConf) - val parts = if (partitioned) { - val subdirs = fs.listStatus(new Path(partitionedPath.getCanonicalPath)) - .filter(_.getPath().getName().startsWith("id=")) - fs.listStatus(subdirs.head.getPath()) - .filter(_.getPath().getName().endsWith(".parquet")) - } else { - fs.listStatus(new Path(unpartitionedPath.getCanonicalPath)) - .filter(_.getPath().getName().endsWith(".parquet")) - } - // grab the meta data from the parquet file. The next section of asserts just make - // sure the test is configured correctly. - assert(parts.size == 1) - val oneFooter = ParquetFileReader.readFooter(hadoopConf, parts.head.getPath) - assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 2) - assert(oneFooter.getFileMetaData.getSchema.getColumns.get(1).getType() === - PrimitiveTypeName.INT96) - val oneBlockMeta = oneFooter.getBlocks().get(0) - val oneBlockColumnMeta = oneBlockMeta.getColumns().get(1) - val columnStats = oneBlockColumnMeta.getStatistics - // This is the important assert. Column stats are written, but they are ignored - // when the data is read back as mentioned above, b/c int96 is unsigned. This - // assert makes sure this holds even if we change parquet versions (if eg. there - // were ever statistics even on unsigned columns). - assert(columnStats.isEmpty) - - // These queries should return the entire dataset, but if the predicates were - // applied to the raw values in parquet, they would incorrectly filter data out. - Seq( - ">" -> "2015-12-31 22:00:00", - "<" -> "2016-01-01 02:00:00" - ).foreach { case (comparison, value) => - val query = - s"select ts from $table where ts $comparison '$value'" - val countWithFilter = spark.sql(query).count() - assert(countWithFilter === 4, query) - } - } - } - } - } - } - } - - test("SPARK-12297: exception on bad timezone") { - val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY - val badTzException = intercept[AnalysisException] { - spark.sql( - s"""CREATE TABLE bad_tz_table ( - | x int - | ) - | STORED AS PARQUET - | TBLPROPERTIES ($key="Blart Versenwald III") - """.stripMargin) - } - assert(badTzException.getMessage.contains("Blart Versenwald III")) - } }