Skip to content
Snippets Groups Projects
Commit 50c3a86f authored by Eric Liang's avatar Eric Liang Committed by Yin Huai
Browse files

[SPARK-6749] [SQL] Make metastore client robust to underlying socket connection loss

This works around a bug in the underlying RetryingMetaStoreClient (HIVE-10384) by refreshing the metastore client on thrift exceptions. We attempt to emulate the proper hive behavior by retrying only as configured by hiveconf.

Author: Eric Liang <ekl@databricks.com>

Closes #6912 from ericl/spark-6749 and squashes the following commits:

2d54b55 [Eric Liang] use conf from state
0e3a74e [Eric Liang] use shim properly
980b3e5 [Eric Liang] Fix conf parsing hive 0.14 conf.
92459b6 [Eric Liang] Work around RetryingMetaStoreClient bug
parent a458efc6
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
import java.net.URI
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConversions._
import scala.language.reflectiveCalls
......@@ -136,12 +137,62 @@ private[hive] class ClientWrapper(
// TODO: should be a def?s
// When we create this val client, the HiveConf of it (conf) is the one associated with state.
private val client = Hive.get(conf)
@GuardedBy("this")
private var client = Hive.get(conf)
// We use hive's conf for compatibility.
private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)
/**
* Runs `f` with multiple retries in case the hive metastore is temporarily unreachable.
*/
private def retryLocked[A](f: => A): A = synchronized {
// Hive sometimes retries internally, so set a deadline to avoid compounding delays.
val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong
var numTries = 0
var caughtException: Exception = null
do {
numTries += 1
try {
return f
} catch {
case e: Exception if causedByThrift(e) =>
caughtException = e
logWarning(
"HiveClientWrapper got thrift exception, destroying client and retrying " +
s"(${retryLimit - numTries} tries remaining)", e)
Thread.sleep(retryDelayMillis)
try {
client = Hive.get(state.getConf, true)
} catch {
case e: Exception if causedByThrift(e) =>
logWarning("Failed to refresh hive client, will retry.", e)
}
}
} while (numTries <= retryLimit && System.nanoTime < deadline)
if (System.nanoTime > deadline) {
logWarning("Deadline exceeded")
}
throw caughtException
}
private def causedByThrift(e: Throwable): Boolean = {
var target = e
while (target != null) {
val msg = target.getMessage()
if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
return true
}
target = target.getCause()
}
false
}
/**
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
*/
private def withHiveState[A](f: => A): A = synchronized {
private def withHiveState[A](f: => A): A = retryLocked {
val original = Thread.currentThread().getContextClassLoader
// Set the thread local metastore client to the client associated with this ClientWrapper.
Hive.set(client)
......
......@@ -21,6 +21,7 @@ import java.lang.{Boolean => JBoolean, Integer => JInteger}
import java.lang.reflect.{Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
......@@ -64,6 +65,8 @@ private[client] sealed abstract class Shim {
def getDriverResults(driver: Driver): Seq[String]
def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
def loadPartition(
hive: Hive,
loadPath: Path,
......@@ -192,6 +195,10 @@ private[client] class Shim_v0_12 extends Shim {
res.toSeq
}
override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000
}
override def loadPartition(
hive: Hive,
loadPath: Path,
......@@ -321,6 +328,12 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val getTimeVarMethod =
findMethod(
classOf[HiveConf],
"getTimeVar",
classOf[HiveConf.ConfVars],
classOf[TimeUnit])
override def loadPartition(
hive: Hive,
......@@ -359,4 +372,10 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE)
}
override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
getTimeVarMethod.invoke(
conf,
HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
TimeUnit.MILLISECONDS).asInstanceOf[Long]
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment