Skip to content
Snippets Groups Projects
Commit 6dd64587 authored by Cheolsoo Park's avatar Cheolsoo Park Committed by Reynold Xin
Browse files

[SPARK-7850][BUILD] Hive 0.12.0 profile in POM should be removed

I grep'ed hive-0.12.0 in the source code and removed all the profiles and doc references.

Author: Cheolsoo Park <cheolsoop@netflix.com>

Closes #6393 from piaozhexiu/SPARK-7850 and squashes the following commits:

fb429ce [Cheolsoo Park] Remove hive-0.13.1 profile
82bf09a [Cheolsoo Park] Remove hive 0.12.0 shim code
f3722da [Cheolsoo Park] Remove hive-0.12.0 profile and references from POM and build docs
parent a9f1c0c5
No related branches found
No related tags found
No related merge requests found
......@@ -118,14 +118,10 @@ mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=2.2.0 -DskipTests
# Building With Hive and JDBC Support
To enable Hive integration for Spark SQL along with its JDBC server and CLI,
add the `-Phive` and `Phive-thriftserver` profiles to your existing build options.
By default Spark will build with Hive 0.13.1 bindings. You can also build for
Hive 0.12.0 using the `-Phive-0.12.0` profile.
By default Spark will build with Hive 0.13.1 bindings.
{% highlight bash %}
# Apache Hadoop 2.4.X with Hive 13 support
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
# Apache Hadoop 2.4.X with Hive 12 support
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package
{% endhighlight %}
# Building for Scala 2.11
......
......@@ -1753,22 +1753,6 @@
<module>sql/hive-thriftserver</module>
</modules>
</profile>
<profile>
<id>hive-0.12.0</id>
<properties>
<hive.version>0.12.0-protobuf-2.5</hive.version>
<hive.version.short>0.12.0</hive.version.short>
<derby.version>10.4.2.0</derby.version>
</properties>
</profile>
<profile>
<id>hive-0.13.1</id>
<properties>
<hive.version>0.13.1a</hive.version>
<hive.version.short>0.13.1</hive.version.short>
<derby.version>10.10.1.1</derby.version>
</properties>
</profile>
<profile>
<id>scala-2.10</id>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import java.sql.{Date, Timestamp}
import java.util.concurrent.Executors
import java.util.{ArrayList => JArrayList, Map => JMap, UUID}
import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.{SessionManager, HiveSession}
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLConf, Row => SparkRow}
import org.apache.spark.sql.execution.SetCommand
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.types._
/**
* A compatibility layer for interacting with Hive version 0.12.0.
*/
private[thriftserver] object HiveThriftServerShim {
val version = "0.12.0"
def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
val serverUserName = ShimLoader.getHadoopShims.getShortUserName(sparkServiceUGI)
setSuperField(sparkCliService, "serverUserName", serverUserName)
}
}
private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext)
extends AbstractSparkSQLDriver(_context) {
override def getResults(res: JArrayList[String]): Boolean = {
if (hiveResponse == null) {
false
} else {
res.addAll(hiveResponse)
hiveResponse = null
true
}
}
}
private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String])(
hiveContext: HiveContext,
sessionToActivePool: SMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging {
private var result: DataFrame = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug("CLOSING")
}
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
if (!iter.hasNext) {
new RowSet()
} else {
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
var curRow = 0
var rowSet = new ArrayBuffer[Row](maxRows.min(1024))
while (curRow < maxRows && iter.hasNext) {
val sparkRow = iter.next()
val row = new Row()
var curCol = 0
while (curCol < sparkRow.length) {
if (sparkRow.isNullAt(curCol)) {
addNullColumnValue(sparkRow, row, curCol)
} else {
addNonNullColumnValue(sparkRow, row, curCol)
}
curCol += 1
}
rowSet += row
curRow += 1
}
new RowSet(rowSet, 0)
}
}
def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to.addString(from(ordinal).asInstanceOf[String])
case IntegerType =>
to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal)))
case BooleanType =>
to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal)))
case DoubleType =>
to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal)))
case FloatType =>
to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal)))
case DecimalType() =>
val hiveDecimal = from.getDecimal(ordinal)
to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
case LongType =>
to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal)))
case ByteType =>
to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
case DateType =>
to.addColumnValue(ColumnValue.dateValue(from(ordinal).asInstanceOf[Date]))
case TimestampType =>
to.addColumnValue(
ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to.addColumnValue(ColumnValue.stringValue(hiveString))
}
}
def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to.addString(null)
case IntegerType =>
to.addColumnValue(ColumnValue.intValue(null))
case BooleanType =>
to.addColumnValue(ColumnValue.booleanValue(null))
case DoubleType =>
to.addColumnValue(ColumnValue.doubleValue(null))
case FloatType =>
to.addColumnValue(ColumnValue.floatValue(null))
case DecimalType() =>
to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal))
case LongType =>
to.addColumnValue(ColumnValue.longValue(null))
case ByteType =>
to.addColumnValue(ColumnValue.byteValue(null))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(null))
case DateType =>
to.addColumnValue(ColumnValue.dateValue(null))
case TimestampType =>
to.addColumnValue(ColumnValue.timestampValue(null))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
to.addColumnValue(ColumnValue.stringValue(null: String))
}
}
def getResultSetSchema: TableSchema = {
logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
if (result.queryExecution.analyzed.output.size == 0) {
new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
} else {
val schema = result.queryExecution.analyzed.output.map { attr =>
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
}
new TableSchema(schema)
}
}
def run(): Unit = {
val statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING)
HiveThriftServer2.listener.onStatementStart(
statementId, parentSession.getSessionHandle.getSessionId.toString, statement, statementId)
hiveContext.sparkContext.setJobGroup(statementId, statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
try {
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value))), _) =>
sessionToActivePool(parentSession.getSessionHandle) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
iter = {
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
result.rdd.toLocalIterator
} else {
result.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
setHasResultSet(true)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, e.getStackTraceString)
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
}
}
private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
extends SessionManager
with ReflectedCompositeService {
private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
getAncestorField[Log](this, 3, "LOG").info(
s"HiveServer2: Async execution pool size $backgroundPoolSize")
setSuperField(this, "operationManager", sparkSqlOperationManager)
addService(sparkSqlOperationManager)
initCompositeService(hiveConf)
}
override def openSession(
username: String,
passwd: String,
sessionConf: java.util.Map[String, String],
withImpersonation: Boolean,
delegationToken: String): SessionHandle = {
hiveContext.openSession()
val sessionHandle = super.openSession(
username, passwd, sessionConf, withImpersonation, delegationToken)
HiveThriftServer2.listener.onSessionCreated("UNKNOWN", sessionHandle.getSessionId.toString)
sessionHandle
}
override def closeSession(sessionHandle: SessionHandle) {
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool -= sessionHandle
hiveContext.detachSession()
}
}
......@@ -136,16 +136,6 @@
</plugins>
</build>
</profile>
<profile>
<id>hive-0.12.0</id>
<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hive-bundle</artifactId>
<version>1.5.0</version>
</dependency>
</dependencies>
</profile>
</profiles>
<build>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import java.net.URI
import java.util.{ArrayList => JArrayList, Properties}
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import org.apache.hadoop.{io => hadoopIo}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer, io => hiveIo}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, ObjectInspector, PrimitiveObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, TypeInfoFactory}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.InputFormat
import org.apache.spark.sql.types.{UTF8String, Decimal, DecimalType}
private[hive] case class HiveFunctionWrapper(functionClassName: String)
extends java.io.Serializable {
// for Serialization
def this() = this(null)
import org.apache.spark.util.Utils._
def createFunction[UDFType <: AnyRef](): UDFType = {
getContextOrSparkClassLoader
.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
}
}
/**
* A compatibility layer for interacting with Hive version 0.12.0.
*/
private[hive] object HiveShim {
val version = "0.12.0"
def getTableDesc(
serdeClass: Class[_ <: Deserializer],
inputFormatClass: Class[_ <: InputFormat[_, _]],
outputFormatClass: Class[_],
properties: Properties) = {
new TableDesc(serdeClass, inputFormatClass, outputFormatClass, properties)
}
def getStringWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.STRING,
getStringWritable(value))
def getIntWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.INT,
getIntWritable(value))
def getDoubleWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.DOUBLE,
getDoubleWritable(value))
def getBooleanWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.BOOLEAN,
getBooleanWritable(value))
def getLongWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.LONG,
getLongWritable(value))
def getFloatWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.FLOAT,
getFloatWritable(value))
def getShortWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.SHORT,
getShortWritable(value))
def getByteWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.BYTE,
getByteWritable(value))
def getBinaryWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.BINARY,
getBinaryWritable(value))
def getDateWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.DATE,
getDateWritable(value))
def getTimestampWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.TIMESTAMP,
getTimestampWritable(value))
def getDecimalWritableConstantObjectInspector(value: Any): ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.DECIMAL,
getDecimalWritable(value))
def getPrimitiveNullWritableConstantObjectInspector: ObjectInspector =
PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
PrimitiveCategory.VOID, null)
def getStringWritable(value: Any): hadoopIo.Text =
if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
def getIntWritable(value: Any): hadoopIo.IntWritable =
if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int])
def getDoubleWritable(value: Any): hiveIo.DoubleWritable =
if (value == null) null else new hiveIo.DoubleWritable(value.asInstanceOf[Double])
def getBooleanWritable(value: Any): hadoopIo.BooleanWritable =
if (value == null) null else new hadoopIo.BooleanWritable(value.asInstanceOf[Boolean])
def getLongWritable(value: Any): hadoopIo.LongWritable =
if (value == null) null else new hadoopIo.LongWritable(value.asInstanceOf[Long])
def getFloatWritable(value: Any): hadoopIo.FloatWritable =
if (value == null) null else new hadoopIo.FloatWritable(value.asInstanceOf[Float])
def getShortWritable(value: Any): hiveIo.ShortWritable =
if (value == null) null else new hiveIo.ShortWritable(value.asInstanceOf[Short])
def getByteWritable(value: Any): hiveIo.ByteWritable =
if (value == null) null else new hiveIo.ByteWritable(value.asInstanceOf[Byte])
def getBinaryWritable(value: Any): hadoopIo.BytesWritable =
if (value == null) null else new hadoopIo.BytesWritable(value.asInstanceOf[Array[Byte]])
def getDateWritable(value: Any): hiveIo.DateWritable =
if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
if (value == null) {
null
} else {
new hiveIo.TimestampWritable(value.asInstanceOf[java.sql.Timestamp])
}
def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =
if (value == null) {
null
} else {
new hiveIo.HiveDecimalWritable(
HiveShim.createDecimal(value.asInstanceOf[Decimal].toJavaBigDecimal))
}
def getPrimitiveNullWritable: NullWritable = NullWritable.get()
def createDriverResultsArray = new JArrayList[String]
def processResults(results: JArrayList[String]) = results
def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE
def getStatsSetupConstRawDataSize = StatsSetupConst.RAW_DATA_SIZE
def createDefaultDBIfNeeded(context: HiveContext) = { }
def getCommandProcessor(cmd: Array[String], conf: HiveConf) = {
CommandProcessorFactory.get(cmd(0), conf)
}
def createDecimal(bd: java.math.BigDecimal): HiveDecimal = {
new HiveDecimal(bd)
}
def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) {
ColumnProjectionUtils.appendReadColumnIDs(conf, ids)
ColumnProjectionUtils.appendReadColumnNames(conf, names)
}
def getExternalTmpPath(context: Context, uri: URI) = {
context.getExternalTmpFileURI(uri)
}
def getDataLocationPath(p: Partition) = p.getPartitionPath
def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsForPruner(tbl)
def compatibilityBlackList = Seq(
"decimal_.*",
"udf7",
"drop_partitions_filter2",
"show_.*",
"serde_regex",
"udf_to_date",
"udaf_collect_set",
"udf_concat"
)
def setLocation(tbl: Table, crtTbl: CreateTableDesc): Unit = {
tbl.setDataLocation(new Path(crtTbl.getLocation()).toUri())
}
def decimalMetastoreString(decimalType: DecimalType): String = "decimal"
def decimalTypeInfo(decimalType: DecimalType): TypeInfo =
TypeInfoFactory.decimalTypeInfo
def decimalTypeInfoToCatalyst(inspector: PrimitiveObjectInspector): DecimalType = {
DecimalType.Unlimited
}
def toCatalystDecimal(hdoi: HiveDecimalObjectInspector, data: Any): Decimal = {
if (hdoi.preferWritable()) {
Decimal(hdoi.getPrimitiveWritableObject(data).getHiveDecimal().bigDecimalValue)
} else {
Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue())
}
}
def getConvertedOI(
inputOI: ObjectInspector,
outputOI: ObjectInspector): ObjectInspector = {
ObjectInspectorConverters.getConvertedOI(inputOI, outputOI, true)
}
def prepareWritable(w: Writable): Writable = {
w
}
def setTblNullFormat(crtTbl: CreateTableDesc, tbl: Table) = {}
}
private[hive] class ShimFileSinkDesc(
var dir: String,
var tableInfo: TableDesc,
var compressed: Boolean)
extends FileSinkDesc(dir, tableInfo, compressed) {
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment