From 1bbf9ff634745148e782370009aa31d3a042638c Mon Sep 17 00:00:00 2001 From: Michael Allman <michael@videoamp.com> Date: Tue, 1 Nov 2016 22:20:19 -0700 Subject: [PATCH] [SPARK-17992][SQL] Return all partitions from HiveShim when Hive throws a metastore exception when attempting to fetch partitions by filter (Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17992) ## What changes were proposed in this pull request? We recently added table partition pruning for partitioned Hive tables converted to using `TableFileCatalog`. When the Hive configuration option `hive.metastore.try.direct.sql` is set to `false`, Hive will throw an exception for unsupported filter expressions. For example, attempting to filter on an integer partition column will throw a `org.apache.hadoop.hive.metastore.api.MetaException`. I discovered this behavior because VideoAmp uses the CDH version of Hive with a Postgresql metastore DB. In this configuration, CDH sets `hive.metastore.try.direct.sql` to `false` by default, and queries that filter on a non-string partition column will fail. Rather than throw an exception in query planning, this patch catches this exception, logs a warning and returns all table partitions instead. Clients of this method are already expected to handle the possibility that the filters will not be honored. ## How was this patch tested? A unit test was added. Author: Michael Allman <michael@videoamp.com> Closes #15673 from mallman/spark-17992-catch_hive_partition_filter_exception. --- .../spark/sql/hive/client/HiveShim.scala | 31 ++++++-- .../sql/hive/client/HiveClientBuilder.scala | 56 ++++++++++++++ .../sql/hive/client/HiveClientSuite.scala | 61 +++++++++++++++ .../spark/sql/hive/client/VersionsSuite.scala | 77 +++++-------------- 4 files changed, 160 insertions(+), 65 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 85edaf63db..3d9642dd14 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -29,7 +29,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, NoSuchObjectException, PrincipalType, ResourceType, ResourceUri} +import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table} import org.apache.hadoop.hive.ql.plan.AddPartitionDesc @@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegralType, StringType} import org.apache.spark.util.Utils @@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") + val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL + val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case e: InvocationTargetException => - // SPARK-18167 retry to investigate the flaky test. This should be reverted before - // the release is cut. - val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter)) - logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess) - logError("all partitions: " + getAllPartitions(hive, table)) - throw e + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => + logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Modifying your Hive metastore configuration to set " + + s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) + // HiveShim clients are expected to handle a superset of the requested partitions + getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => + throw new RuntimeException("Caught Hive MetaException attempting to get partition " + + "metadata by filter from Hive. You can set the Spark configuration setting " + + s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around this " + + "problem, however this will result in degraded performance. Please report a bug: " + + "https://issues.apache.org/jira/browse/SPARK", ex) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala new file mode 100644 index 0000000000..591a968c82 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import java.io.File + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.util.VersionInfo + +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +private[client] class HiveClientBuilder { + private val sparkConf = new SparkConf() + + // In order to speed up test execution during development or in Jenkins, you can specify the path + // of an existing Ivy cache: + private val ivyPath: Option[String] = { + sys.env.get("SPARK_VERSIONS_SUITE_IVY_PATH").orElse( + Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath)) + } + + private def buildConf() = { + lazy val warehousePath = Utils.createTempDir() + lazy val metastorePath = Utils.createTempDir() + metastorePath.delete() + Map( + "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true", + "hive.metastore.warehouse.dir" -> warehousePath.toString) + } + + def buildClient(version: String, hadoopConf: Configuration): HiveClient = { + IsolatedClientLoader.forVersion( + hiveMetastoreVersion = version, + hadoopVersion = VersionInfo.getVersion, + sparkConf = sparkConf, + hadoopConf = hadoopConf, + config = buildConf(), + ivyPath = ivyPath).createClient() + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala new file mode 100644 index 0000000000..4790331168 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.conf.HiveConf + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} +import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.types.IntegerType + +class HiveClientSuite extends SparkFunSuite { + private val clientBuilder = new HiveClientBuilder + + private val tryDirectSqlKey = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname + + test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") { + val testPartitionCount = 5 + + val storageFormat = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + compressed = false, + properties = Map.empty) + + val hadoopConf = new Configuration() + hadoopConf.setBoolean(tryDirectSqlKey, false) + val client = clientBuilder.buildClient(HiveUtils.hiveExecutionVersion, hadoopConf) + client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (part INT)") + + val partitions = (1 to testPartitionCount).map { part => + CatalogTablePartition(Map("part" -> part.toString), storageFormat) + } + client.createPartitions( + "default", "test", partitions, ignoreIfExists = false) + + val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), + Seq(EqualTo(AttributeReference("part", IntegerType)(), Literal(3)))) + + assert(filteredPartitions.size == testPartitionCount) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 9a10957c8e..081b0ed9bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -23,9 +23,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred.TextInputFormat -import org.apache.hadoop.util.VersionInfo -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} @@ -48,46 +47,19 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} @ExtendedHiveTest class VersionsSuite extends SparkFunSuite with Logging { - private val sparkConf = new SparkConf() - - // In order to speed up test execution during development or in Jenkins, you can specify the path - // of an existing Ivy cache: - private val ivyPath: Option[String] = { - sys.env.get("SPARK_VERSIONS_SUITE_IVY_PATH").orElse( - Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath)) - } - - private def buildConf() = { - lazy val warehousePath = Utils.createTempDir() - lazy val metastorePath = Utils.createTempDir() - metastorePath.delete() - Map( - "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true", - "hive.metastore.warehouse.dir" -> warehousePath.toString) - } + private val clientBuilder = new HiveClientBuilder + import clientBuilder.buildClient test("success sanity check") { - val badClient = IsolatedClientLoader.forVersion( - hiveMetastoreVersion = HiveUtils.hiveExecutionVersion, - hadoopVersion = VersionInfo.getVersion, - sparkConf = sparkConf, - hadoopConf = new Configuration(), - config = buildConf(), - ivyPath = ivyPath).createClient() + val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration()) val db = new CatalogDatabase("default", "desc", "loc", Map()) badClient.createDatabase(db, ignoreIfExists = true) } test("hadoop configuration preserved") { - val hadoopConf = new Configuration(); + val hadoopConf = new Configuration() hadoopConf.set("test", "success") - val client = IsolatedClientLoader.forVersion( - hiveMetastoreVersion = HiveUtils.hiveExecutionVersion, - hadoopVersion = VersionInfo.getVersion, - sparkConf = sparkConf, - hadoopConf = hadoopConf, - config = buildConf(), - ivyPath = ivyPath).createClient() + val client = buildClient(HiveUtils.hiveExecutionVersion, hadoopConf) assert("success" === client.getConf("test", null)) } @@ -109,15 +81,7 @@ class VersionsSuite extends SparkFunSuite with Logging { // TODO: currently only works on mysql where we manually create the schema... ignore("failure sanity check") { val e = intercept[Throwable] { - val badClient = quietly { - IsolatedClientLoader.forVersion( - hiveMetastoreVersion = "13", - hadoopVersion = VersionInfo.getVersion, - sparkConf = sparkConf, - hadoopConf = new Configuration(), - config = buildConf(), - ivyPath = ivyPath).createClient() - } + val badClient = quietly { buildClient("13", new Configuration()) } } assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'") } @@ -130,16 +94,9 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: create client") { client = null System.gc() // Hack to avoid SEGV on some JVM versions. - val hadoopConf = new Configuration(); + val hadoopConf = new Configuration() hadoopConf.set("test", "success") - client = - IsolatedClientLoader.forVersion( - hiveMetastoreVersion = version, - hadoopVersion = VersionInfo.getVersion, - sparkConf = sparkConf, - hadoopConf = hadoopConf, - config = buildConf(), - ivyPath = ivyPath).createClient() + client = buildClient(version, hadoopConf) } def table(database: String, tableName: String): CatalogTable = { @@ -287,15 +244,19 @@ class VersionsSuite extends SparkFunSuite with Logging { client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)") } + val testPartitionCount = 2 + test(s"$version: createPartitions") { - val partition1 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "1"), storageFormat) - val partition2 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "2"), storageFormat) + val partitions = (1 to testPartitionCount).map { key2 => + CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat) + } client.createPartitions( - "default", "src_part", Seq(partition1, partition2), ignoreIfExists = true) + "default", "src_part", partitions, ignoreIfExists = true) } test(s"$version: getPartitions(catalogTable)") { - assert(2 == client.getPartitions(client.getTable("default", "src_part")).size) + assert(testPartitionCount == + client.getPartitions(client.getTable("default", "src_part")).size) } test(s"$version: getPartitionsByFilter") { @@ -306,6 +267,8 @@ class VersionsSuite extends SparkFunSuite with Logging { // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition. if (version != "0.12") { assert(result.size == 1) + } else { + assert(result.size == testPartitionCount) } } @@ -327,7 +290,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: getPartitions(db: String, table: String)") { - assert(2 == client.getPartitions("default", "src_part", None).size) + assert(testPartitionCount == client.getPartitions("default", "src_part", None).size) } test(s"$version: loadPartition") { -- GitLab