Skip to content
Snippets Groups Projects
Commit 94624eac authored by Davies Liu's avatar Davies Liu Committed by Yin Huai
Browse files

[SPARK-11739][SQL] clear the instantiated SQLContext

Currently, if the first SQLContext is not removed after stopping SparkContext, a SQLContext could set there forever. This patch make this more robust.

Author: Davies Liu <davies@databricks.com>

Closes #9706 from davies/clear_context.
parent 6f99522d
No related branches found
No related tags found
No related merge requests found
...@@ -1229,7 +1229,7 @@ class SQLContext private[sql]( ...@@ -1229,7 +1229,7 @@ class SQLContext private[sql](
// construction of the instance. // construction of the instance.
sparkContext.addSparkListener(new SparkListener { sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
SQLContext.clearInstantiatedContext(self) SQLContext.clearInstantiatedContext()
} }
}) })
...@@ -1270,13 +1270,13 @@ object SQLContext { ...@@ -1270,13 +1270,13 @@ object SQLContext {
*/ */
def getOrCreate(sparkContext: SparkContext): SQLContext = { def getOrCreate(sparkContext: SparkContext): SQLContext = {
val ctx = activeContext.get() val ctx = activeContext.get()
if (ctx != null) { if (ctx != null && !ctx.sparkContext.isStopped) {
return ctx return ctx
} }
synchronized { synchronized {
val ctx = instantiatedContext.get() val ctx = instantiatedContext.get()
if (ctx == null) { if (ctx == null || ctx.sparkContext.isStopped) {
new SQLContext(sparkContext) new SQLContext(sparkContext)
} else { } else {
ctx ctx
...@@ -1284,12 +1284,17 @@ object SQLContext { ...@@ -1284,12 +1284,17 @@ object SQLContext {
} }
} }
private[sql] def clearInstantiatedContext(sqlContext: SQLContext): Unit = { private[sql] def clearInstantiatedContext(): Unit = {
instantiatedContext.compareAndSet(sqlContext, null) instantiatedContext.set(null)
} }
private[sql] def setInstantiatedContext(sqlContext: SQLContext): Unit = { private[sql] def setInstantiatedContext(sqlContext: SQLContext): Unit = {
instantiatedContext.compareAndSet(null, sqlContext) synchronized {
val ctx = instantiatedContext.get()
if (ctx == null || ctx.sparkContext.isStopped) {
instantiatedContext.set(sqlContext)
}
}
} }
private[sql] def getInstantiatedContextOption(): Option[SQLContext] = { private[sql] def getInstantiatedContextOption(): Option[SQLContext] = {
......
...@@ -31,7 +31,7 @@ class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -31,7 +31,7 @@ class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll {
originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption() originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
SQLContext.clearActive() SQLContext.clearActive()
originalInstantiatedSQLContext.foreach(ctx => SQLContext.clearInstantiatedContext(ctx)) SQLContext.clearInstantiatedContext()
sparkConf = sparkConf =
new SparkConf(false) new SparkConf(false)
.setMaster("local[*]") .setMaster("local[*]")
...@@ -89,10 +89,9 @@ class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -89,10 +89,9 @@ class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll {
testNewSession(rootSQLContext) testNewSession(rootSQLContext)
testNewSession(rootSQLContext) testNewSession(rootSQLContext)
testCreatingNewSQLContext(allowMultipleSQLContexts) testCreatingNewSQLContext(allowMultipleSQLContexts)
SQLContext.clearInstantiatedContext(rootSQLContext)
} finally { } finally {
sc.stop() sc.stop()
SQLContext.clearInstantiatedContext()
} }
} }
} }
......
...@@ -34,7 +34,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -34,7 +34,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption() originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
SQLContext.clearActive() SQLContext.clearActive()
originalInstantiatedSQLContext.foreach(ctx => SQLContext.clearInstantiatedContext(ctx)) SQLContext.clearInstantiatedContext()
} }
override protected def afterAll(): Unit = { override protected def afterAll(): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment