Skip to content
Snippets Groups Projects
Commit caebd7f2 authored by Takuya UESHIN's avatar Takuya UESHIN Committed by Wenchen Fan
Browse files

[SPARK-15870][SQL] DataFrame can't execute after uncacheTable.

## What changes were proposed in this pull request?

If a cached `DataFrame` executed more than once and then do `uncacheTable` like the following:

```
    val selectStar = sql("SELECT * FROM testData WHERE key = 1")
    selectStar.createOrReplaceTempView("selectStar")

    spark.catalog.cacheTable("selectStar")
    checkAnswer(
      selectStar,
      Seq(Row(1, "1")))

    spark.catalog.uncacheTable("selectStar")
    checkAnswer(
      selectStar,
      Seq(Row(1, "1")))
```

, then the uncached `DataFrame` can't execute because of `Task not serializable` exception like:

```
org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2038)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:884)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:883)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
...
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:153)
	at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
...
```

Notice that `DataFrame` uncached with `DataFrame.unpersist()` works, but with `spark.catalog.uncacheTable` doesn't work.

This pr reverts a part of cf38fe04 not to unregister `batchStats` accumulator, which is not needed to be unregistered here because it will be done by `ContextCleaner` after it is collected by GC.

## How was this patch tested?

Added a test to check if DataFrame can execute after uncacheTable and other existing tests.
But I made a test to check if the accumulator was cleared as `ignore` because the test would be flaky.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #13596 from ueshin/issues/SPARK-15870.
parent 20b8f2c3
No related branches found
No related tags found
No related merge requests found
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment