Skip to content
Snippets Groups Projects
Commit daa70bf1 authored by Michael Armbrust's avatar Michael Armbrust
Browse files

[SPARK-6907] [SQL] Isolated client for HiveMetastore

This PR adds initial support for loading multiple versions of Hive in a single JVM and provides a common interface for extracting metadata from the `HiveMetastoreClient` for a given version.  This is accomplished by creating an isolated `ClassLoader` that operates according to the following rules:

 - __Shared Classes__: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader`
  allowing the results of calls to the `ClientInterface` to be visible externally.
 - __Hive Classes__: new instances are loaded from `execJars`.  These classes are not
  accessible externally due to their custom loading.
 - __Barrier Classes__: Classes such as `ClientWrapper` are defined in Spark but must link to a specific version of Hive.  As a result, the bytecode is acquired from the Spark `ClassLoader` but a new copy is created for each instance of `IsolatedClientLoader`.
  This new instance is able to see a specific version of hive without using reflection where ever hive is consistent across versions. Since
  this is a unique instance, it is not visible externally other than as a generic
  `ClientInterface`, unless `isolationOn` is set to `false`.

In addition to the unit tests, I have also tested this locally against mysql instances of the Hive Metastore.  I've also successfully ported Spark SQL to run with this client, but due to the size of the changes, that will come in a follow-up PR.

By default, Hive jars are currently downloaded from Maven automatically for a given version to ease packaging and testing.  However, there is also support for specifying their location manually for deployments without internet.

Author: Michael Armbrust <michael@databricks.com>

Closes #5851 from marmbrus/isolatedClient and squashes the following commits:

c72f6ac [Michael Armbrust] rxins comments
1e271fa [Michael Armbrust] [SPARK-6907][SQL] Isolated client for HiveMetastore
parent f4af9255
No related branches found
No related tags found
No related merge requests found
Showing
with 1088 additions and 7 deletions
......@@ -701,7 +701,7 @@ object SparkSubmit {
}
/** Provides utility functions to be used inside SparkSubmit. */
private[deploy] object SparkSubmitUtils {
private[spark] object SparkSubmitUtils {
// Exposed for testing
var printStream = SparkSubmit.printStream
......
......@@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
*/
class NoSuchTableException extends Exception
class NoSuchDatabaseException extends Exception
/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
*/
......
......@@ -17,12 +17,31 @@
package org.apache.spark.sql.catalyst
import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
import java.io._
import org.apache.spark.util.Utils
package object util {
/** Silences output to stderr or stdout for the duration of f */
def quietly[A](f: => A): A = {
val origErr = System.err
val origOut = System.out
try {
System.setErr(new PrintStream(new OutputStream {
def write(b: Int) = {}
}))
System.setOut(new PrintStream(new OutputStream {
def write(b: Int) = {}
}))
f
} finally {
System.setErr(origErr)
System.setOut(origOut)
}
}
def fileToString(file: File, encoding: String = "UTF-8"): String = {
val inStream = new FileInputStream(file)
val outStream = new ByteArrayOutputStream
......@@ -42,10 +61,9 @@ package object util {
new String(outStream.toByteArray, encoding)
}
def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
def resourceToBytes(
resource: String,
classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = {
val inStream = classLoader.getResourceAsStream(resource)
val outStream = new ByteArrayOutputStream
try {
......@@ -61,7 +79,14 @@ package object util {
finally {
inStream.close()
}
new String(outStream.toByteArray, encoding)
outStream.toByteArray
}
def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
new String(resourceToBytes(resource, classLoader), encoding)
}
def stringToFile(file: File, str: String): File = {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.client
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
case class HiveDatabase(
name: String,
location: String)
abstract class TableType { val name: String }
case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }
case class HiveStorageDescriptor(
location: String,
inputFormat: String,
outputFormat: String,
serde: String)
case class HivePartition(
values: Seq[String],
storage: HiveStorageDescriptor)
case class HiveColumn(name: String, hiveType: String, comment: String)
case class HiveTable(
specifiedDatabase: Option[String],
name: String,
schema: Seq[HiveColumn],
partitionColumns: Seq[HiveColumn],
properties: Map[String, String],
serdeProperties: Map[String, String],
tableType: TableType,
location: Option[String] = None,
inputFormat: Option[String] = None,
outputFormat: Option[String] = None,
serde: Option[String] = None) {
@transient
private[client] var client: ClientInterface = _
private[client] def withClient(ci: ClientInterface): this.type = {
client = ci
this
}
def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
def isPartitioned: Boolean = partitionColumns.nonEmpty
def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
// Hive does not support backticks when passing names to the client.
def qualifiedName: String = s"$database.$name"
}
/**
* An externally visible interface to the Hive client. This interface is shared across both the
* internal and external classloaders for a given version of Hive and thus must expose only
* shared classes.
*/
trait ClientInterface {
/**
* Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
* result in one string.
*/
def runSqlHive(sql: String): Seq[String]
/** Returns the names of all tables in the given database. */
def listTables(dbName: String): Seq[String]
/** Returns the name of the active database. */
def currentDatabase: String
/** Returns the metadata for specified database, throwing an exception if it doesn't exist */
def getDatabase(name: String): HiveDatabase = {
getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
}
/** Returns the metadata for a given database, or None if it doesn't exist. */
def getDatabaseOption(name: String): Option[HiveDatabase]
/** Returns the specified table, or throws [[NoSuchTableException]]. */
def getTable(dbName: String, tableName: String): HiveTable = {
getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
}
/** Returns the metadata for the specified table or None if it doens't exist. */
def getTableOption(dbName: String, tableName: String): Option[HiveTable]
/** Creates a table with the given metadata. */
def createTable(table: HiveTable): Unit
/** Updates the given table with new metadata. */
def alterTable(table: HiveTable): Unit
/** Creates a new database with the given name. */
def createDatabase(database: HiveDatabase): Unit
/** Returns all partitions for the given table. */
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
/** Loads a static partition into an existing table. */
def loadPartition(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit
/** Loads data into an existing table. */
def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit
/** Loads new dynamic partitions into an existing table. */
def loadDynamicPartitions(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
numDP: Int,
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit
/** Used for testing only. Removes all metadata from this instance of Hive. */
def reset(): Unit
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.client
import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
import java.net.URI
import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConversions._
import scala.language.reflectiveCalls
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.metastore.api.Database
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.Driver
import org.apache.spark.Logging
import org.apache.spark.sql.execution.QueryExecutionException
/**
* A class that wraps the HiveClient and converts its responses to externally visible classes.
* Note that this class is typically loaded with an internal classloader for each instantiation,
* allowing it to interact directly with a specific isolated version of Hive. Loading this class
* with the isolated classloader however will result in it only being visible as a ClientInterface,
* not a ClientWrapper.
*
* This class needs to interact with multiple versions of Hive, but will always be compiled with
* the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility
* must use reflection after matching on `version`.
*
* @param version the version of hive used when pick function calls that are not compatible.
* @param config a collection of configuration options that will be added to the hive conf before
* opening the hive client.
*/
class ClientWrapper(
version: HiveVersion,
config: Map[String, String])
extends ClientInterface
with Logging
with ReflectionMagic {
private val conf = new HiveConf(classOf[SessionState])
config.foreach { case (k, v) =>
logDebug(s"Hive Config: $k=$v")
conf.set(k, v)
}
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
private val outputBuffer = new java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](10240)
def write(i: Int): Unit = {
buffer(pos) = i
pos = (pos + 1) % buffer.size
}
override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator
def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while(line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
}
stringBuilder.toString()
}
}
val state = {
val original = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
val ret = try {
val newState = new SessionState(conf)
SessionState.start(newState)
newState.out = new PrintStream(outputBuffer, true, "UTF-8")
newState.err = new PrintStream(outputBuffer, true, "UTF-8")
newState
} finally {
Thread.currentThread().setContextClassLoader(original)
}
ret
}
private val client = Hive.get(conf)
/**
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
*/
private def withHiveState[A](f: => A): A = synchronized {
val original = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
Hive.set(client)
version match {
case hive.v12 =>
classOf[SessionState]
.callStatic[SessionState, SessionState]("start", state)
case hive.v13 =>
classOf[SessionState]
.callStatic[SessionState, SessionState]("setCurrentSessionState", state)
}
val ret = try f finally {
Thread.currentThread().setContextClassLoader(original)
}
ret
}
override def currentDatabase: String = withHiveState {
state.getCurrentDatabase
}
override def createDatabase(database: HiveDatabase): Unit = withHiveState {
client.createDatabase(
new Database(
database.name,
"",
new File(database.location).toURI.toString,
new java.util.HashMap),
true)
}
override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState {
Option(client.getDatabase(name)).map { d =>
HiveDatabase(
name = d.getName,
location = d.getLocationUri)
}
}
override def getTableOption(
dbName: String,
tableName: String): Option[HiveTable] = withHiveState {
logDebug(s"Looking up $dbName.$tableName")
val hiveTable = Option(client.getTable(dbName, tableName, false))
val converted = hiveTable.map { h =>
HiveTable(
name = h.getTableName,
specifiedDatabase = Option(h.getDbName),
schema = h.getCols.map(f => HiveColumn(f.getName, f.getType, f.getComment)),
partitionColumns = h.getPartCols.map(f => HiveColumn(f.getName, f.getType, f.getComment)),
properties = h.getParameters.toMap,
serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.toMap,
tableType = ManagedTable, // TODO
location = version match {
case hive.v12 => Option(h.call[URI]("getDataLocation")).map(_.toString)
case hive.v13 => Option(h.call[Path]("getDataLocation")).map(_.toString)
},
inputFormat = Option(h.getInputFormatClass).map(_.getName),
outputFormat = Option(h.getOutputFormatClass).map(_.getName),
serde = Option(h.getSerializationLib)).withClient(this)
}
converted
}
private def toInputFormat(name: String) =
Class.forName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
private def toOutputFormat(name: String) =
Class.forName(name)
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
private def toQlTable(table: HiveTable): metadata.Table = {
val qlTable = new metadata.Table(table.database, table.name)
qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
qlTable.setPartCols(
table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
version match {
case hive.v12 =>
table.location.map(new URI(_)).foreach(u => qlTable.call[URI, Unit]("setDataLocation", u))
case hive.v13 =>
table.location
.map(new org.apache.hadoop.fs.Path(_))
.foreach(qlTable.call[Path, Unit]("setDataLocation", _))
}
table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass)
table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass)
table.serde.foreach(qlTable.setSerializationLib)
qlTable
}
override def createTable(table: HiveTable): Unit = withHiveState {
val qlTable = toQlTable(table)
client.createTable(qlTable)
}
override def alterTable(table: HiveTable): Unit = withHiveState {
val qlTable = toQlTable(table)
client.alterTable(table.qualifiedName, qlTable)
}
override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState {
val qlTable = toQlTable(hTable)
val qlPartitions = version match {
case hive.v12 =>
client.call[metadata.Table, Set[metadata.Partition]]("getAllPartitionsForPruner", qlTable)
case hive.v13 =>
client.call[metadata.Table, Set[metadata.Partition]]("getAllPartitionsOf", qlTable)
}
qlPartitions.map(_.getTPartition).map { p =>
HivePartition(
values = Option(p.getValues).map(_.toSeq).getOrElse(Seq.empty),
storage = HiveStorageDescriptor(
location = p.getSd.getLocation,
inputFormat = p.getSd.getInputFormat,
outputFormat = p.getSd.getOutputFormat,
serde = p.getSd.getSerdeInfo.getSerializationLib))
}.toSeq
}
override def listTables(dbName: String): Seq[String] = withHiveState {
client.getAllTables
}
/**
* Runs the specified SQL query using Hive.
*/
override def runSqlHive(sql: String): Seq[String] = {
val maxResults = 100000
val results = runHive(sql, maxResults)
// It is very confusing when you only get back some of the results...
if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED")
results
}
/**
* Execute the command using Hive and return the results as a sequence. Each element
* in the sequence is one row.
*/
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState {
logDebug(s"Running hiveql '$cmd'")
if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") }
try {
val cmd_trimmed: String = cmd.trim()
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = version match {
case hive.v12 =>
classOf[CommandProcessorFactory]
.callStatic[String, HiveConf, CommandProcessor]("get", cmd_1, conf)
case hive.v13 =>
classOf[CommandProcessorFactory]
.callStatic[Array[String], HiveConf, CommandProcessor]("get", Array(tokens(0)), conf)
}
proc match {
case driver: Driver =>
val response: CommandProcessorResponse = driver.run(cmd)
// Throw an exception if there is an error in query processing.
if (response.getResponseCode != 0) {
driver.close()
throw new QueryExecutionException(response.getErrorMessage)
}
driver.setMaxRows(maxRows)
val results = version match {
case hive.v12 =>
val res = new JArrayList[String]
driver.call[JArrayList[String], Boolean]("getResults", res)
res.toSeq
case hive.v13 =>
val res = new JArrayList[Object]
driver.call[JArrayList[Object], Boolean]("getResults", res)
res.map { r =>
r match {
case s: String => s
case a: Array[Object] => a(0).asInstanceOf[String]
}
}
}
driver.close()
results
case _ =>
if (state.out != null) {
state.out.println(tokens(0) + " " + cmd_1)
}
Seq(proc.run(cmd_1).getResponseCode.toString)
}
} catch {
case e: Exception =>
logError(
s"""
|======================
|HIVE FAILURE OUTPUT
|======================
|${outputBuffer.toString}
|======================
|END HIVE FAILURE OUTPUT
|======================
""".stripMargin)
throw e
}
}
def loadPartition(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String],
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit = withHiveState {
client.loadPartition(
new Path(loadPath), // TODO: Use URI
tableName,
partSpec,
replace,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
}
def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit = withHiveState {
client.loadTable(
new Path(loadPath),
tableName,
replace,
holdDDLTime)
}
def loadDynamicPartitions(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String],
replace: Boolean,
numDP: Int,
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit = withHiveState {
client.loadDynamicPartitions(
new Path(loadPath),
tableName,
partSpec,
replace,
numDP,
holdDDLTime,
listBucketingEnabled)
}
def reset(): Unit = withHiveState {
client.getAllTables("default").foreach { t =>
logDebug(s"Deleting table $t")
val table = client.getTable("default", t)
client.getIndexes("default", t, 255).foreach { index =>
client.dropIndex("default", t, index.getIndexName, true)
}
if (!table.isIndexTable) {
client.dropTable("default", t)
}
}
client.getAllDatabases.filterNot(_ == "default").foreach { db =>
logDebug(s"Dropping Database: $db")
client.dropDatabase(db, true, false, true)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.client
import java.io.File
import java.net.URLClassLoader
import java.util
import scala.language.reflectiveCalls
import scala.util.Try
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.sql.catalyst.util.quietly
/** Factory for `IsolatedClientLoader` with specific versions of hive. */
object IsolatedClientLoader {
/**
* Creates isolated Hive client loaders by downloading the requested version from maven.
*/
def forVersion(
version: String,
config: Map[String, String] = Map.empty): IsolatedClientLoader = synchronized {
val resolvedVersion = hiveVersion(version)
val files = resolvedVersions.getOrElseUpdate(resolvedVersion, downloadVersion(resolvedVersion))
new IsolatedClientLoader(hiveVersion(version), files, config)
}
def hiveVersion(version: String): HiveVersion = version match {
case "12" | "0.12" | "0.12.0" => hive.v12
case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13
}
private def downloadVersion(version: HiveVersion): Seq[File] = {
val hiveArtifacts =
(Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") ++
(if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil))
.map(a => s"org.apache.hive:$a:${version.fullVersion}") :+
"com.google.guava:guava:14.0.1" :+
"org.apache.hadoop:hadoop-client:2.4.0" :+
"mysql:mysql-connector-java:5.1.12"
val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
hiveArtifacts.mkString(","),
Some("http://www.datanucleus.org/downloads/maven2"),
None)
}
val allFiles = classpath.split(",").map(new File(_)).toSet
// TODO: Remove copy logic.
val tempDir = File.createTempFile("hive", "v" + version.toString)
tempDir.delete()
tempDir.mkdir()
allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir))
tempDir.listFiles()
}
private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[File]]
}
/**
* Creates a Hive `ClientInterface` using a classloader that works according to the following rules:
* - Shared classes: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader`
* allowing the results of calls to the `ClientInterface` to be visible externally.
* - Hive classes: new instances are loaded from `execJars`. These classes are not
* accessible externally due to their custom loading.
* - ClientWrapper: a new copy is created for each instance of `IsolatedClassLoader`.
* This new instance is able to see a specific version of hive without using reflection. Since
* this is a unique instance, it is not visible externally other than as a generic
* `ClientInterface`, unless `isolationOn` is set to `false`.
*
* @param version The version of hive on the classpath. used to pick specific function signatures
* that are not compatibile accross versions.
* @param execJars A collection of jar files that must include hive and hadoop.
* @param config A set of options that will be added to the HiveConf of the constructed client.
* @param isolationOn When true, custom versions of barrier classes will be constructed. Must be
* true unless loading the version of hive that is on Sparks classloader.
* @param rootClassLoader The system root classloader. Must not know about hive classes.
* @param baseClassLoader The spark classloader that is used to load shared classes.
*
*/
class IsolatedClientLoader(
val version: HiveVersion,
val execJars: Seq[File] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader)
extends Logging {
// Check to make sure that the root classloader does not know about Hive.
assert(Try(baseClassLoader.loadClass("org.apache.hive.HiveConf")).isFailure)
/** All jars used by the hive specific classloader. */
protected def allJars = execJars.map(_.toURI.toURL).toArray
protected def isSharedClass(name: String): Boolean =
name.contains("slf4j") ||
name.contains("log4j") ||
name.startsWith("org.apache.spark.") ||
name.startsWith("scala.") ||
name.startsWith("com.google") ||
name.startsWith("java.lang.") ||
name.startsWith("java.net")
/** True if `name` refers to a spark class that must see specific version of Hive. */
protected def isBarrierClass(name: String): Boolean =
name.startsWith("org.apache.spark.sql.hive.execution.PairSerDe") ||
name.startsWith(classOf[ClientWrapper].getName) ||
name.startsWith(classOf[ReflectionMagic].getName)
protected def classToPath(name: String): String =
name.replaceAll("\\.", "/") + ".class"
/** The classloader that is used to load an isolated version of Hive. */
protected val classLoader: ClassLoader = new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name) && isolationOn) {
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
logDebug(s"shared class: $name")
baseClassLoader.loadClass(name)
}
}
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
Thread.currentThread.setContextClassLoader(classLoader)
/** The isolated client interface to Hive. */
val client: ClientInterface = try {
classLoader
.loadClass(classOf[ClientWrapper].getName)
.getConstructors.head
.newInstance(version, config)
.asInstanceOf[ClientInterface]
} finally {
Thread.currentThread.setContextClassLoader(baseClassLoader)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
/** Support for interacting with different versions of the HiveMetastoreClient */
package object client {
private[client] abstract class HiveVersion(val fullVersion: String, val hasBuiltinsJar: Boolean)
// scalastyle:off
private[client] object hive {
case object v10 extends HiveVersion("0.10.0", true)
case object v11 extends HiveVersion("0.11.0", false)
case object v12 extends HiveVersion("0.12.0", false)
case object v13 extends HiveVersion("0.13.1", false)
}
// scalastyle:on
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.client
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.util.Utils
import org.scalatest.FunSuite
class VersionsSuite extends FunSuite with Logging {
val testType = "derby"
private def buildConf() = {
lazy val warehousePath = Utils.createTempDir()
lazy val metastorePath = Utils.createTempDir()
metastorePath.delete()
Map(
"javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
"hive.metastore.warehouse.dir" -> warehousePath.toString)
}
test("success sanity check") {
val badClient = IsolatedClientLoader.forVersion("13", buildConf()).client
val db = new HiveDatabase("default", "")
badClient.createDatabase(db)
}
private def getNestedMessages(e: Throwable): String = {
var causes = ""
var lastException = e
while (lastException != null) {
causes += lastException.toString + "\n"
lastException = lastException.getCause
}
causes
}
// Its actually pretty easy to mess things up and have all of your tests "pass" by accidentally
// connecting to an auto-populated, in-process metastore. Let's make sure we are getting the
// versions right by forcing a known compatibility failure.
// TODO: currently only works on mysql where we manually create the schema...
ignore("failure sanity check") {
val e = intercept[Throwable] {
val badClient = quietly { IsolatedClientLoader.forVersion("13", buildConf()).client }
}
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}
private val versions = Seq("12", "13")
private var client: ClientInterface = null
versions.foreach { version =>
test(s"$version: listTables") {
client = null
client = IsolatedClientLoader.forVersion(version, buildConf()).client
client.listTables("default")
}
test(s"$version: createDatabase") {
val db = HiveDatabase("default", "")
client.createDatabase(db)
}
test(s"$version: createTable") {
val table =
HiveTable(
specifiedDatabase = Option("default"),
name = "src",
schema = Seq(HiveColumn("key", "int", "")),
partitionColumns = Seq.empty,
properties = Map.empty,
serdeProperties = Map.empty,
tableType = ManagedTable,
location = None,
inputFormat =
Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName),
outputFormat =
Some(classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName),
serde =
Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()))
client.createTable(table)
}
test(s"$version: getTable") {
client.getTable("default", "src")
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment