Skip to content
Snippets Groups Projects
Commit 4b325c77 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-5193][SQL] Tighten up HiveContext API

1. Removed the deprecated LocalHiveContext
2. Made private[sql] fields protected[sql] so they don't show up in javadoc.
3. Added javadoc to refreshTable.
4. Added Experimental tag to analyze command.

Author: Reynold Xin <rxin@databricks.com>

Closes #4054 from rxin/hivecontext-api and squashes the following commits:

25cc00a [Reynold Xin] Add implicit conversion back.
cbca886 [Reynold Xin] [SPARK-5193][SQL] Tighten up HiveContext API
parent 6abc45e3
No related branches found
No related tags found
No related merge requests found
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive package org.apache.spark.sql.hive
import java.io.{BufferedReader, File, InputStreamReader, PrintStream} import java.io.{BufferedReader, InputStreamReader, PrintStream}
import java.sql.{Date, Timestamp} import java.sql.{Date, Timestamp}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
...@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.session.SessionState ...@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry} import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry}
...@@ -42,28 +43,6 @@ import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTable ...@@ -42,28 +43,6 @@ import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTable
import org.apache.spark.sql.sources.DataSourceStrategy import org.apache.spark.sql.sources.DataSourceStrategy
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
/**
* DEPRECATED: Use HiveContext instead.
*/
@deprecated("""
Use HiveContext instead. It will still create a local metastore if one is not specified.
However, note that the default directory is ./metastore_db, not ./metastore
""", "1.1")
class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) {
lazy val metastorePath = new File("metastore").getCanonicalPath
lazy val warehousePath: String = new File("warehouse").getCanonicalPath
/** Sets up the system initially or after a RESET command */
protected def configure() {
setConf("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastorePath;create=true")
setConf("hive.metastore.warehouse.dir", warehousePath)
}
configure() // Must be called before initializing the catalog below.
}
/** /**
* An instance of the Spark SQL execution engine that integrates with data stored in Hive. * An instance of the Spark SQL execution engine that integrates with data stored in Hive.
* Configuration for Hive is read from hive-site.xml on the classpath. * Configuration for Hive is read from hive-site.xml on the classpath.
...@@ -80,7 +59,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ...@@ -80,7 +59,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
* SerDe. * SerDe.
*/ */
private[spark] def convertMetastoreParquet: Boolean = protected[sql] def convertMetastoreParquet: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true" getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
...@@ -97,14 +76,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ...@@ -97,14 +76,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
} }
} }
@deprecated("hiveql() is deprecated as the sql function now parses using HiveQL by default. " +
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
def hiveql(hqlQuery: String): SchemaRDD = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
@deprecated("hql() is deprecated as the sql function now parses using HiveQL by default. " +
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery)
/** /**
* Creates a table using the schema of the given class. * Creates a table using the schema of the given class.
* *
...@@ -116,6 +87,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ...@@ -116,6 +87,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
} }
/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
* table, such as the location of blocks. When those change outside of Spark SQL, users should
* call this function to invalidate the cache.
*/
def refreshTable(tableName: String): Unit = { def refreshTable(tableName: String): Unit = {
// TODO: Database support... // TODO: Database support...
catalog.refreshTable("default", tableName) catalog.refreshTable("default", tableName)
...@@ -133,6 +110,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ...@@ -133,6 +110,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* Right now, it only supports Hive tables and it only updates the size of a Hive table * Right now, it only supports Hive tables and it only updates the size of a Hive table
* in the Hive metastore. * in the Hive metastore.
*/ */
@Experimental
def analyze(tableName: String) { def analyze(tableName: String) {
val relation = EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName))) val relation = EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName)))
...@@ -289,7 +267,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ...@@ -289,7 +267,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
results results
} }
/** /**
* Execute the command using Hive and return the results as a sequence. Each element * Execute the command using Hive and return the results as a sequence. Each element
* in the sequence is one row. * in the sequence is one row.
...@@ -345,7 +322,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ...@@ -345,7 +322,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
} }
@transient @transient
val hivePlanner = new SparkPlanner with HiveStrategies { private val hivePlanner = new SparkPlanner with HiveStrategies {
val hiveContext = self val hiveContext = self
override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq(
...@@ -410,7 +387,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ...@@ -410,7 +387,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
} }
} }
object HiveContext {
private object HiveContext {
protected val primitiveTypes = protected val primitiveTypes =
Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType, Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
ShortType, DateType, TimestampType, BinaryType) ShortType, DateType, TimestampType, BinaryType)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment