Skip to content
Snippets Groups Projects
Commit 8a354bef authored by Yin Huai's avatar Yin Huai Committed by Davies Liu
Browse files

[SPARK-11042] [SQL] Add a mechanism to ban creating multiple root SQLContexts/HiveContexts in a JVM

https://issues.apache.org/jira/browse/SPARK-11042

Author: Yin Huai <yhuai@databricks.com>

Closes #9058 from yhuai/SPARK-11042.
parent 2e572c41
No related branches found
No related tags found
No related merge requests found
......@@ -186,6 +186,16 @@ private[spark] object SQLConf {
import SQLConfEntry._
val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts",
defaultValue = Some(true),
doc = "When set to true, creating multiple SQLContexts/HiveContexts is allowed." +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
"through the constructor (new SQLContexts/HiveContexts created through newSession " +
"method is allowed). Please note that this conf needs to be set in Spark Conf. Once" +
"a SQLContext/HiveContext has been created, changing the value of this conf will not" +
"have effect.",
isPublic = true)
val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
defaultValue = Some(true),
doc = "When set to true Spark SQL will automatically select a compression codec for each " +
......
......@@ -26,7 +26,7 @@ import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
import org.apache.spark.SparkContext
import org.apache.spark.{SparkException, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
......@@ -64,14 +64,37 @@ import org.apache.spark.util.Utils
*/
class SQLContext private[sql](
@transient val sparkContext: SparkContext,
@transient protected[sql] val cacheManager: CacheManager)
@transient protected[sql] val cacheManager: CacheManager,
val isRootContext: Boolean)
extends org.apache.spark.Logging with Serializable {
self =>
def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager)
def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager, true)
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
// If spark.sql.allowMultipleContexts is true, we will throw an exception if a user
// wants to create a new root SQLContext (a SLQContext that is not created by newSession).
private val allowMultipleContexts =
sparkContext.conf.getBoolean(
SQLConf.ALLOW_MULTIPLE_CONTEXTS.key,
SQLConf.ALLOW_MULTIPLE_CONTEXTS.defaultValue.get)
// Assert no root SQLContext is running when allowMultipleContexts is false.
{
if (!allowMultipleContexts && isRootContext) {
SQLContext.getInstantiatedContextOption() match {
case Some(rootSQLContext) =>
val errMsg = "Only one SQLContext/HiveContext may be running in this JVM. " +
s"It is recommended to use SQLContext.getOrCreate to get the instantiated " +
s"SQLContext/HiveContext. To ignore this error, " +
s"set ${SQLConf.ALLOW_MULTIPLE_CONTEXTS.key} = true in SparkConf."
throw new SparkException(errMsg)
case None => // OK
}
}
}
/**
* Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
* registered functions, but sharing the same SparkContext and CacheManager.
......@@ -79,7 +102,10 @@ class SQLContext private[sql](
* @since 1.6.0
*/
def newSession(): SQLContext = {
new SQLContext(sparkContext, cacheManager)
new SQLContext(
sparkContext = sparkContext,
cacheManager = cacheManager,
isRootContext = false)
}
/**
......@@ -1239,6 +1265,10 @@ object SQLContext {
instantiatedContext.compareAndSet(null, sqlContext)
}
private[sql] def getInstantiatedContextOption(): Option[SQLContext] = {
Option(instantiatedContext.get())
}
/**
* Changes the SQLContext that will be returned in this thread and its children when
* SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
......@@ -1260,6 +1290,10 @@ object SQLContext {
activeContext.remove()
}
private[sql] def getActiveContextOption(): Option[SQLContext] = {
Option(activeContext.get())
}
/**
* Converts an iterator of Java Beans to InternalRow using the provided
* bean info & schema. This is not related to the singleton, but is a static
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark._
import org.scalatest.BeforeAndAfterAll
class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll {
private var originalActiveSQLContext: Option[SQLContext] = _
private var originalInstantiatedSQLContext: Option[SQLContext] = _
private var sparkConf: SparkConf = _
override protected def beforeAll(): Unit = {
originalActiveSQLContext = SQLContext.getActiveContextOption()
originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
SQLContext.clearActive()
originalInstantiatedSQLContext.foreach(ctx => SQLContext.clearInstantiatedContext(ctx))
sparkConf =
new SparkConf(false)
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false")
.set("spark.driver.allowMultipleContexts", "true")
}
override protected def afterAll(): Unit = {
// Set these states back.
originalActiveSQLContext.foreach(ctx => SQLContext.setActive(ctx))
originalInstantiatedSQLContext.foreach(ctx => SQLContext.setInstantiatedContext(ctx))
}
def testNewSession(rootSQLContext: SQLContext): Unit = {
// Make sure we can successfully create new Session.
rootSQLContext.newSession()
// Reset the state. It is always safe to clear the active context.
SQLContext.clearActive()
}
def testCreatingNewSQLContext(allowsMultipleContexts: Boolean): Unit = {
val conf =
sparkConf
.clone
.set(SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, allowsMultipleContexts.toString)
val sparkContext = new SparkContext(conf)
try {
if (allowsMultipleContexts) {
new SQLContext(sparkContext)
SQLContext.clearActive()
} else {
// If allowsMultipleContexts is false, make sure we can get the error.
val message = intercept[SparkException] {
new SQLContext(sparkContext)
}.getMessage
assert(message.contains("Only one SQLContext/HiveContext may be running"))
}
} finally {
sparkContext.stop()
}
}
test("test the flag to disallow creating multiple root SQLContext") {
Seq(false, true).foreach { allowMultipleSQLContexts =>
val conf =
sparkConf
.clone
.set(SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, allowMultipleSQLContexts.toString)
val sc = new SparkContext(conf)
try {
val rootSQLContext = new SQLContext(sc)
testNewSession(rootSQLContext)
testNewSession(rootSQLContext)
testCreatingNewSQLContext(allowMultipleSQLContexts)
SQLContext.clearInstantiatedContext(rootSQLContext)
} finally {
sc.stop()
}
}
}
}
......@@ -89,10 +89,11 @@ class HiveContext private[hive](
sc: SparkContext,
cacheManager: CacheManager,
@transient execHive: ClientWrapper,
@transient metaHive: ClientInterface) extends SQLContext(sc, cacheManager) with Logging {
@transient metaHive: ClientInterface,
isRootContext: Boolean) extends SQLContext(sc, cacheManager, isRootContext) with Logging {
self =>
def this(sc: SparkContext) = this(sc, new CacheManager, null, null)
def this(sc: SparkContext) = this(sc, new CacheManager, null, null, true)
def this(sc: JavaSparkContext) = this(sc.sc)
import org.apache.spark.sql.hive.HiveContext._
......@@ -105,7 +106,12 @@ class HiveContext private[hive](
* and Hive client (both of execution and metadata) with existing HiveContext.
*/
override def newSession(): HiveContext = {
new HiveContext(sc, cacheManager, executionHive.newSession(), metadataHive.newSession())
new HiveContext(
sc = sc,
cacheManager = cacheManager,
execHive = executionHive.newSession(),
metaHive = metadataHive.newSession(),
isRootContext = false)
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment