Skip to content
Snippets Groups Projects
Commit ec935abc authored by Reynold Xin's avatar Reynold Xin Committed by Michael Armbrust
Browse files

[SQL] Break hiveOperators.scala into multiple files.

The single file was getting very long (500+ loc).

Author: Reynold Xin <rxin@apache.org>

Closes #1166 from rxin/hiveOperators and squashes the following commits:

5b43068 [Reynold Xin] [SQL] Break hiveOperators.scala into multiple files.
parent ca5d8b59
No related branches found
No related tags found
No related merge requests found
......@@ -64,7 +64,6 @@ private[hive] trait HiveStrategies {
val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
val (pruningPredicates, otherPredicates) = predicates.partition {
_.references.map(_.exprId).subsetOf(partitionKeyIds)
}
pruneFilterProject(
......@@ -81,16 +80,16 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil
case describe: logical.DescribeCommand => {
case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
resolvedTable match {
case t: MetastoreRelation =>
Seq(DescribeHiveTableCommand(
t, describe.output, describe.isExtended)(context))
Seq(DescribeHiveTableCommand(t, describe.output, describe.isExtended)(context))
case o: LogicalPlan =>
Seq(DescribeCommand(planLater(o), describe.output)(context))
}
}
case _ => Nil
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution
import scala.collection.JavaConversions._
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
import org.apache.spark.sql.execution.{Command, LeafNode}
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
/**
* Implementation for "describe [extended] table".
*
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeHiveTableCommand(
table: MetastoreRelation,
output: Seq[Attribute],
isExtended: Boolean)(
@transient context: HiveContext)
extends LeafNode with Command {
// Strings with the format like Hive. It is used for result comparison in our unit tests.
lazy val hiveString: Seq[String] = {
val alignment = 20
val delim = "\t"
sideEffectResult.map {
case (name, dataType, comment) =>
String.format("%-" + alignment + "s", name) + delim +
String.format("%-" + alignment + "s", dataType) + delim +
String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
}
}
override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil
val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
if (!partitionColumns.isEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
partColumnInfo ++
Seq(("# Partition Information", "", "")) ++
Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++
partColumnInfo
}
if (isExtended) {
results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
}
results
}
override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}
override def otherCopyArgs = context :: Nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution
import scala.collection.JavaConversions._
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive._
import org.apache.spark.util.MutablePair
/**
* :: DeveloperApi ::
* The Hive table scan operator. Column and partition pruning are both handled.
*
* @param attributes Attributes to be fetched from the Hive table.
* @param relation The Hive table be be scanned.
* @param partitionPruningPred An optional partition pruning predicate for partitioned table.
*/
@DeveloperApi
case class HiveTableScan(
attributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Option[Expression])(
@transient val context: HiveContext)
extends LeafNode
with HiveInspectors {
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
"Partition pruning predicates only supported for partitioned tables.")
// Bind all partition key attribute references in the partition pruning predicate for later
// evaluation.
private[this] val boundPruningPred = partitionPruningPred.map { pred =>
require(
pred.dataType == BooleanType,
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
BindReferences.bindReference(pred, relation.partitionKeys)
}
@transient
private[this] val hadoopReader = new HadoopTableReader(relation.tableDesc, context)
/**
* The hive object inspector for this table, which can be used to extract values from the
* serialized row representation.
*/
@transient
private[this] lazy val objectInspector =
relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector]
/**
* Functions that extract the requested attributes from the hive output. Partitioned values are
* casted from string to its declared data type.
*/
@transient
protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = {
attributes.map { a =>
val ordinal = relation.partitionKeys.indexOf(a)
if (ordinal >= 0) {
val dataType = relation.partitionKeys(ordinal).dataType
(_: Any, partitionKeys: Array[String]) => {
castFromString(partitionKeys(ordinal), dataType)
}
} else {
val ref = objectInspector.getAllStructFieldRefs
.find(_.getFieldName == a.name)
.getOrElse(sys.error(s"Can't find attribute $a"))
val fieldObjectInspector = ref.getFieldObjectInspector
val unwrapHiveData = fieldObjectInspector match {
case _: HiveVarcharObjectInspector =>
(value: Any) => value.asInstanceOf[HiveVarchar].getValue
case _: HiveDecimalObjectInspector =>
(value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue())
case _ =>
identity[Any] _
}
(row: Any, _: Array[String]) => {
val data = objectInspector.getStructFieldData(row, ref)
val hiveData = unwrapData(data, fieldObjectInspector)
if (hiveData != null) unwrapHiveData(hiveData) else null
}
}
}
}
private[this] def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}
private def addColumnMetadataToConf(hiveConf: HiveConf) {
// Specifies IDs and internal names of columns to be scanned.
val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer)
val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")
if (attributes.size == relation.output.size) {
ColumnProjectionUtils.setFullyReadColumns(hiveConf)
} else {
ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
}
ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name))
// Specifies types and object inspectors of columns to be scanned.
val structOI = ObjectInspectorUtils
.getStandardObjectInspector(
relation.tableDesc.getDeserializer.getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]
val columnTypeNames = structOI
.getAllStructFieldRefs
.map(_.getFieldObjectInspector)
.map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
.mkString(",")
hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames)
}
addColumnMetadataToConf(context.hiveconf)
private def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
hadoopReader.makeRDDForTable(relation.hiveQlTable)
} else {
hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
}
/**
* Prunes partitions not involve the query plan.
*
* @param partitions All partitions of the relation.
* @return Partitions that are involved in the query plan.
*/
private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
boundPruningPred match {
case None => partitions
case Some(shouldKeep) => partitions.filter { part =>
val dataTypes = relation.partitionKeys.map(_.dataType)
val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield {
castFromString(value, dataType)
}
// Only partitioned values are needed here, since the predicate has already been bound to
// partition key attribute references.
val row = new GenericRow(castedValues.toArray)
shouldKeep.eval(row).asInstanceOf[Boolean]
}
}
}
override def execute() = {
inputRdd.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val mutableRow = new GenericMutableRow(attributes.length)
val mutablePair = new MutablePair[Any, Array[String]]()
val buffered = iterator.buffered
// NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern
// matching are avoided intentionally.
val rowsAndPartitionKeys = buffered.head match {
// With partition keys
case _: Array[Any] =>
buffered.map { case array: Array[Any] =>
val deserializedRow = array(0)
val partitionKeys = array(1).asInstanceOf[Array[String]]
mutablePair.update(deserializedRow, partitionKeys)
}
// Without partition keys
case _ =>
val emptyPartitionKeys = Array.empty[String]
buffered.map { deserializedRow =>
mutablePair.update(deserializedRow, emptyPartitionKeys)
}
}
rowsAndPartitionKeys.map { pair =>
var i = 0
while (i < attributes.length) {
mutableRow(i) = attributeFunctions(i)(pair._1, pair._2)
i += 1
}
mutableRow: Row
}
}
}
}
override def output = attributes
}
......@@ -17,222 +17,27 @@
package org.apache.spark.sql.hive.execution
import scala.collection.JavaConversions._
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive}
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils
import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
import org.apache.spark
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive._
import org.apache.spark.util.MutablePair
import org.apache.spark.{TaskContext, SparkException}
/* Implicits */
import scala.collection.JavaConversions._
/**
* :: DeveloperApi ::
* The Hive table scan operator. Column and partition pruning are both handled.
*
* @param attributes Attributes to be fetched from the Hive table.
* @param relation The Hive table be be scanned.
* @param partitionPruningPred An optional partition pruning predicate for partitioned table.
*/
@DeveloperApi
case class HiveTableScan(
attributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Option[Expression])(
@transient val context: HiveContext)
extends LeafNode
with HiveInspectors {
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
"Partition pruning predicates only supported for partitioned tables.")
// Bind all partition key attribute references in the partition pruning predicate for later
// evaluation.
private val boundPruningPred = partitionPruningPred.map { pred =>
require(
pred.dataType == BooleanType,
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
BindReferences.bindReference(pred, relation.partitionKeys)
}
@transient
val hadoopReader = new HadoopTableReader(relation.tableDesc, context)
/**
* The hive object inspector for this table, which can be used to extract values from the
* serialized row representation.
*/
@transient
lazy val objectInspector =
relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector]
/**
* Functions that extract the requested attributes from the hive output. Partitioned values are
* casted from string to its declared data type.
*/
@transient
protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = {
attributes.map { a =>
val ordinal = relation.partitionKeys.indexOf(a)
if (ordinal >= 0) {
val dataType = relation.partitionKeys(ordinal).dataType
(_: Any, partitionKeys: Array[String]) => {
castFromString(partitionKeys(ordinal), dataType)
}
} else {
val ref = objectInspector.getAllStructFieldRefs
.find(_.getFieldName == a.name)
.getOrElse(sys.error(s"Can't find attribute $a"))
val fieldObjectInspector = ref.getFieldObjectInspector
val unwrapHiveData = fieldObjectInspector match {
case _: HiveVarcharObjectInspector =>
(value: Any) => value.asInstanceOf[HiveVarchar].getValue
case _: HiveDecimalObjectInspector =>
(value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue())
case _ =>
identity[Any] _
}
(row: Any, _: Array[String]) => {
val data = objectInspector.getStructFieldData(row, ref)
val hiveData = unwrapData(data, fieldObjectInspector)
if (hiveData != null) unwrapHiveData(hiveData) else null
}
}
}
}
private def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}
private def addColumnMetadataToConf(hiveConf: HiveConf) {
// Specifies IDs and internal names of columns to be scanned.
val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer)
val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")
if (attributes.size == relation.output.size) {
ColumnProjectionUtils.setFullyReadColumns(hiveConf)
} else {
ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
}
ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name))
// Specifies types and object inspectors of columns to be scanned.
val structOI = ObjectInspectorUtils
.getStandardObjectInspector(
relation.tableDesc.getDeserializer.getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]
val columnTypeNames = structOI
.getAllStructFieldRefs
.map(_.getFieldObjectInspector)
.map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
.mkString(",")
hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames)
}
addColumnMetadataToConf(context.hiveconf)
@transient
def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
hadoopReader.makeRDDForTable(relation.hiveQlTable)
} else {
hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
}
/**
* Prunes partitions not involve the query plan.
*
* @param partitions All partitions of the relation.
* @return Partitions that are involved in the query plan.
*/
private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
boundPruningPred match {
case None => partitions
case Some(shouldKeep) => partitions.filter { part =>
val dataTypes = relation.partitionKeys.map(_.dataType)
val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield {
castFromString(value, dataType)
}
// Only partitioned values are needed here, since the predicate has already been bound to
// partition key attribute references.
val row = new GenericRow(castedValues.toArray)
shouldKeep.eval(row).asInstanceOf[Boolean]
}
}
}
def execute() = {
inputRdd.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val mutableRow = new GenericMutableRow(attributes.length)
val mutablePair = new MutablePair[Any, Array[String]]()
val buffered = iterator.buffered
// NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern
// matching are avoided intentionally.
val rowsAndPartitionKeys = buffered.head match {
// With partition keys
case _: Array[Any] =>
buffered.map { case array: Array[Any] =>
val deserializedRow = array(0)
val partitionKeys = array(1).asInstanceOf[Array[String]]
mutablePair.update(deserializedRow, partitionKeys)
}
// Without partition keys
case _ =>
val emptyPartitionKeys = Array.empty[String]
buffered.map { deserializedRow =>
mutablePair.update(deserializedRow, emptyPartitionKeys)
}
}
rowsAndPartitionKeys.map { pair =>
var i = 0
while (i < attributes.length) {
mutableRow(i) = attributeFunctions(i)(pair._1, pair._2)
i += 1
}
mutableRow: Row
}
}
}
}
def output = attributes
}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter}
/**
* :: DeveloperApi ::
......@@ -441,84 +246,3 @@ case class InsertIntoHiveTable(
sc.sparkContext.makeRDD(Nil, 1)
}
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class NativeCommand(
sql: String, output: Seq[Attribute])(
@transient context: HiveContext)
extends LeafNode with Command {
override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)
override def execute(): RDD[spark.sql.Row] = {
if (sideEffectResult.size == 0) {
context.emptyResult
} else {
val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
context.sparkContext.parallelize(rows, 1)
}
}
override def otherCopyArgs = context :: Nil
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeHiveTableCommand(
table: MetastoreRelation,
output: Seq[Attribute],
isExtended: Boolean)(
@transient context: HiveContext)
extends LeafNode with Command {
// Strings with the format like Hive. It is used for result comparison in our unit tests.
lazy val hiveString: Seq[String] = {
val alignment = 20
val delim = "\t"
sideEffectResult.map {
case (name, dataType, comment) =>
String.format("%-" + alignment + "s", name) + delim +
String.format("%-" + alignment + "s", dataType) + delim +
String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
}
}
override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil
val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
if (!partitionColumns.isEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
partColumnInfo ++
Seq(("# Partition Information", "", "")) ++
Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++
partColumnInfo
}
if (isExtended) {
results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
}
results
}
override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}
override def otherCopyArgs = context :: Nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
import org.apache.spark.sql.execution.{Command, LeafNode}
import org.apache.spark.sql.hive.HiveContext
/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class NativeCommand(
sql: String, output: Seq[Attribute])(
@transient context: HiveContext)
extends LeafNode with Command {
override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)
override def execute(): RDD[Row] = {
if (sideEffectResult.size == 0) {
context.emptyResult
} else {
val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
context.sparkContext.parallelize(rows, 1)
}
}
override def otherCopyArgs = context :: Nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment